]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
core: Fix some FIXMEs for the extended API
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/timeval.h>
33 #include <pulse/rtclock.h>
34 #include <pulse/xmalloc.h>
35 #include <pulse/fork-detect.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42 #include <pulsecore/core-util.h>
43
44 #include "internal.h"
45 #include "stream.h"
46
47 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
49
50 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52 #define SMOOTHER_MIN_HISTORY (4)
53
54 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
56 }
57
58 static void reset_callbacks(pa_stream *s) {
59 s->read_callback = NULL;
60 s->read_userdata = NULL;
61 s->write_callback = NULL;
62 s->write_userdata = NULL;
63 s->state_callback = NULL;
64 s->state_userdata = NULL;
65 s->overflow_callback = NULL;
66 s->overflow_userdata = NULL;
67 s->underflow_callback = NULL;
68 s->underflow_userdata = NULL;
69 s->latency_update_callback = NULL;
70 s->latency_update_userdata = NULL;
71 s->moved_callback = NULL;
72 s->moved_userdata = NULL;
73 s->suspended_callback = NULL;
74 s->suspended_userdata = NULL;
75 s->started_callback = NULL;
76 s->started_userdata = NULL;
77 s->event_callback = NULL;
78 s->event_userdata = NULL;
79 s->buffer_attr_callback = NULL;
80 s->buffer_attr_userdata = NULL;
81 }
82
83 static pa_stream *pa_stream_new_with_proplist_internal(
84 pa_context *c,
85 const char *name,
86 const pa_sample_spec *ss,
87 const pa_channel_map *map,
88 pa_format_info * const *formats,
89 pa_proplist *p) {
90
91 pa_stream *s;
92 int i;
93
94 pa_assert(c);
95 pa_assert(PA_REFCNT_VALUE(c) >= 1);
96 pa_assert((ss == NULL && map == NULL) || formats == NULL);
97
98 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
100
101 s = pa_xnew(pa_stream, 1);
102 PA_REFCNT_INIT(s);
103 s->context = c;
104 s->mainloop = c->mainloop;
105
106 s->direction = PA_STREAM_NODIRECTION;
107 s->state = PA_STREAM_UNCONNECTED;
108 s->flags = 0;
109
110 if (ss)
111 s->sample_spec = *ss;
112 else
113 s->sample_spec.format = PA_SAMPLE_INVALID;
114
115 if (map)
116 s->channel_map = *map;
117 else
118 pa_channel_map_init(&s->channel_map);
119
120 s->n_formats = 0;
121 if (formats) {
122 for (i = 0; formats[i] && i < PA_MAX_FORMATS; i++) {
123 s->n_formats++;
124 s->req_formats[i] = pa_format_info_copy(formats[i]);
125 }
126 /* Make sure the input array was NULL-terminated */
127 pa_assert(formats[i] == NULL);
128 }
129
130 /* We'll get the final negotiated format after connecting */
131 s->format = NULL;
132
133 s->direct_on_input = PA_INVALID_INDEX;
134
135 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136 if (name)
137 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
138
139 s->channel = 0;
140 s->channel_valid = FALSE;
141 s->syncid = c->csyncid++;
142 s->stream_index = PA_INVALID_INDEX;
143
144 s->requested_bytes = 0;
145 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
147 /* We initialize der target length here, so that if the user
148 * passes no explicit buffering metrics the default is similar to
149 * what older PA versions provided. */
150
151 s->buffer_attr.maxlength = (uint32_t) -1;
152 if (ss)
153 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154 else {
155 /* FIXME: We assume a worst-case compressed format corresponding to
156 * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
157 pa_sample_spec tmp_ss = {
158 .format = PA_SAMPLE_S16NE,
159 .rate = 48000,
160 .channels = 2,
161 };
162 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163 }
164 s->buffer_attr.minreq = (uint32_t) -1;
165 s->buffer_attr.prebuf = (uint32_t) -1;
166 s->buffer_attr.fragsize = (uint32_t) -1;
167
168 s->device_index = PA_INVALID_INDEX;
169 s->device_name = NULL;
170 s->suspended = FALSE;
171 s->corked = FALSE;
172
173 s->write_memblock = NULL;
174 s->write_data = NULL;
175
176 pa_memchunk_reset(&s->peek_memchunk);
177 s->peek_data = NULL;
178 s->record_memblockq = NULL;
179
180 memset(&s->timing_info, 0, sizeof(s->timing_info));
181 s->timing_info_valid = FALSE;
182
183 s->previous_time = 0;
184
185 s->read_index_not_before = 0;
186 s->write_index_not_before = 0;
187 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188 s->write_index_corrections[i].valid = 0;
189 s->current_write_index_correction = 0;
190
191 s->auto_timing_update_event = NULL;
192 s->auto_timing_update_requested = FALSE;
193 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
194
195 reset_callbacks(s);
196
197 s->smoother = NULL;
198
199 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200 PA_LLIST_PREPEND(pa_stream, c->streams, s);
201 pa_stream_ref(s);
202
203 return s;
204 }
205
206 pa_stream *pa_stream_new_with_proplist(
207 pa_context *c,
208 const char *name,
209 const pa_sample_spec *ss,
210 const pa_channel_map *map,
211 pa_proplist *p) {
212
213 pa_channel_map tmap;
214
215 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
220
221 if (!map)
222 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
223
224 return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, p);
225 }
226
227 pa_stream *pa_stream_new_extended(
228 pa_context *c,
229 const char *name,
230 pa_format_info * const *formats,
231 pa_proplist *p) {
232
233 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
234
235 return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, p);
236 }
237
238 static void stream_unlink(pa_stream *s) {
239 pa_operation *o, *n;
240 pa_assert(s);
241
242 if (!s->context)
243 return;
244
245 /* Detach from context */
246
247 /* Unref all operatio object that point to us */
248 for (o = s->context->operations; o; o = n) {
249 n = o->next;
250
251 if (o->stream == s)
252 pa_operation_cancel(o);
253 }
254
255 /* Drop all outstanding replies for this stream */
256 if (s->context->pdispatch)
257 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
258
259 if (s->channel_valid) {
260 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
261 s->channel = 0;
262 s->channel_valid = FALSE;
263 }
264
265 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
266 pa_stream_unref(s);
267
268 s->context = NULL;
269
270 if (s->auto_timing_update_event) {
271 pa_assert(s->mainloop);
272 s->mainloop->time_free(s->auto_timing_update_event);
273 }
274
275 reset_callbacks(s);
276 }
277
278 static void stream_free(pa_stream *s) {
279 unsigned int i;
280
281 pa_assert(s);
282
283 stream_unlink(s);
284
285 if (s->write_memblock) {
286 pa_memblock_release(s->write_memblock);
287 pa_memblock_unref(s->write_data);
288 }
289
290 if (s->peek_memchunk.memblock) {
291 if (s->peek_data)
292 pa_memblock_release(s->peek_memchunk.memblock);
293 pa_memblock_unref(s->peek_memchunk.memblock);
294 }
295
296 if (s->record_memblockq)
297 pa_memblockq_free(s->record_memblockq);
298
299 if (s->proplist)
300 pa_proplist_free(s->proplist);
301
302 if (s->smoother)
303 pa_smoother_free(s->smoother);
304
305 for (i = 0; i < s->n_formats; i++)
306 pa_xfree(s->req_formats[i]);
307
308 pa_xfree(s->device_name);
309 pa_xfree(s);
310 }
311
312 void pa_stream_unref(pa_stream *s) {
313 pa_assert(s);
314 pa_assert(PA_REFCNT_VALUE(s) >= 1);
315
316 if (PA_REFCNT_DEC(s) <= 0)
317 stream_free(s);
318 }
319
320 pa_stream* pa_stream_ref(pa_stream *s) {
321 pa_assert(s);
322 pa_assert(PA_REFCNT_VALUE(s) >= 1);
323
324 PA_REFCNT_INC(s);
325 return s;
326 }
327
328 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
329 pa_assert(s);
330 pa_assert(PA_REFCNT_VALUE(s) >= 1);
331
332 return s->state;
333 }
334
335 pa_context* pa_stream_get_context(pa_stream *s) {
336 pa_assert(s);
337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
338
339 return s->context;
340 }
341
342 uint32_t pa_stream_get_index(pa_stream *s) {
343 pa_assert(s);
344 pa_assert(PA_REFCNT_VALUE(s) >= 1);
345
346 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
347 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
348
349 return s->stream_index;
350 }
351
352 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
353 pa_assert(s);
354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
355
356 if (s->state == st)
357 return;
358
359 pa_stream_ref(s);
360
361 s->state = st;
362
363 if (s->state_callback)
364 s->state_callback(s, s->state_userdata);
365
366 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
367 stream_unlink(s);
368
369 pa_stream_unref(s);
370 }
371
372 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
373 pa_assert(s);
374 pa_assert(PA_REFCNT_VALUE(s) >= 1);
375
376 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
377 return;
378
379 if (s->state == PA_STREAM_READY &&
380 (force || !s->auto_timing_update_requested)) {
381 pa_operation *o;
382
383 /* pa_log("Automatically requesting new timing data"); */
384
385 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
386 pa_operation_unref(o);
387 s->auto_timing_update_requested = TRUE;
388 }
389 }
390
391 if (s->auto_timing_update_event) {
392 if (force)
393 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
394
395 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
396
397 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
398 }
399 }
400
401 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
402 pa_context *c = userdata;
403 pa_stream *s;
404 uint32_t channel;
405
406 pa_assert(pd);
407 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
408 pa_assert(t);
409 pa_assert(c);
410 pa_assert(PA_REFCNT_VALUE(c) >= 1);
411
412 pa_context_ref(c);
413
414 if (pa_tagstruct_getu32(t, &channel) < 0 ||
415 !pa_tagstruct_eof(t)) {
416 pa_context_fail(c, PA_ERR_PROTOCOL);
417 goto finish;
418 }
419
420 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
421 goto finish;
422
423 if (s->state != PA_STREAM_READY)
424 goto finish;
425
426 pa_context_set_error(c, PA_ERR_KILLED);
427 pa_stream_set_state(s, PA_STREAM_FAILED);
428
429 finish:
430 pa_context_unref(c);
431 }
432
433 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
434 pa_usec_t x;
435
436 pa_assert(s);
437 pa_assert(!force_start || !force_stop);
438
439 if (!s->smoother)
440 return;
441
442 x = pa_rtclock_now();
443
444 if (s->timing_info_valid) {
445 if (aposteriori)
446 x -= s->timing_info.transport_usec;
447 else
448 x += s->timing_info.transport_usec;
449 }
450
451 if (s->suspended || s->corked || force_stop)
452 pa_smoother_pause(s->smoother, x);
453 else if (force_start || s->buffer_attr.prebuf == 0) {
454
455 if (!s->timing_info_valid &&
456 !aposteriori &&
457 !force_start &&
458 !force_stop &&
459 s->context->version >= 13) {
460
461 /* If the server supports STARTED events we take them as
462 * indications when audio really starts/stops playing, if
463 * we don't have any timing info yet -- instead of trying
464 * to be smart and guessing the server time. Otherwise the
465 * unknown transport delay add too much noise to our time
466 * calculations. */
467
468 return;
469 }
470
471 pa_smoother_resume(s->smoother, x, TRUE);
472 }
473
474 /* Please note that we have no idea if playback actually started
475 * if prebuf is non-zero! */
476 }
477
478 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
479 pa_context *c = userdata;
480 pa_stream *s;
481 uint32_t channel;
482 const char *dn;
483 pa_bool_t suspended;
484 uint32_t di;
485 pa_usec_t usec = 0;
486 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
487
488 pa_assert(pd);
489 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
490 pa_assert(t);
491 pa_assert(c);
492 pa_assert(PA_REFCNT_VALUE(c) >= 1);
493
494 pa_context_ref(c);
495
496 if (c->version < 12) {
497 pa_context_fail(c, PA_ERR_PROTOCOL);
498 goto finish;
499 }
500
501 if (pa_tagstruct_getu32(t, &channel) < 0 ||
502 pa_tagstruct_getu32(t, &di) < 0 ||
503 pa_tagstruct_gets(t, &dn) < 0 ||
504 pa_tagstruct_get_boolean(t, &suspended) < 0) {
505 pa_context_fail(c, PA_ERR_PROTOCOL);
506 goto finish;
507 }
508
509 if (c->version >= 13) {
510
511 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
512 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
513 pa_tagstruct_getu32(t, &fragsize) < 0 ||
514 pa_tagstruct_get_usec(t, &usec) < 0) {
515 pa_context_fail(c, PA_ERR_PROTOCOL);
516 goto finish;
517 }
518 } else {
519 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
520 pa_tagstruct_getu32(t, &tlength) < 0 ||
521 pa_tagstruct_getu32(t, &prebuf) < 0 ||
522 pa_tagstruct_getu32(t, &minreq) < 0 ||
523 pa_tagstruct_get_usec(t, &usec) < 0) {
524 pa_context_fail(c, PA_ERR_PROTOCOL);
525 goto finish;
526 }
527 }
528 }
529
530 if (!pa_tagstruct_eof(t)) {
531 pa_context_fail(c, PA_ERR_PROTOCOL);
532 goto finish;
533 }
534
535 if (!dn || di == PA_INVALID_INDEX) {
536 pa_context_fail(c, PA_ERR_PROTOCOL);
537 goto finish;
538 }
539
540 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
541 goto finish;
542
543 if (s->state != PA_STREAM_READY)
544 goto finish;
545
546 if (c->version >= 13) {
547 if (s->direction == PA_STREAM_RECORD)
548 s->timing_info.configured_source_usec = usec;
549 else
550 s->timing_info.configured_sink_usec = usec;
551
552 s->buffer_attr.maxlength = maxlength;
553 s->buffer_attr.fragsize = fragsize;
554 s->buffer_attr.tlength = tlength;
555 s->buffer_attr.prebuf = prebuf;
556 s->buffer_attr.minreq = minreq;
557 }
558
559 pa_xfree(s->device_name);
560 s->device_name = pa_xstrdup(dn);
561 s->device_index = di;
562
563 s->suspended = suspended;
564
565 check_smoother_status(s, TRUE, FALSE, FALSE);
566 request_auto_timing_update(s, TRUE);
567
568 if (s->moved_callback)
569 s->moved_callback(s, s->moved_userdata);
570
571 finish:
572 pa_context_unref(c);
573 }
574
575 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
576 pa_context *c = userdata;
577 pa_stream *s;
578 uint32_t channel;
579 pa_usec_t usec = 0;
580 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
581
582 pa_assert(pd);
583 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
584 pa_assert(t);
585 pa_assert(c);
586 pa_assert(PA_REFCNT_VALUE(c) >= 1);
587
588 pa_context_ref(c);
589
590 if (c->version < 15) {
591 pa_context_fail(c, PA_ERR_PROTOCOL);
592 goto finish;
593 }
594
595 if (pa_tagstruct_getu32(t, &channel) < 0) {
596 pa_context_fail(c, PA_ERR_PROTOCOL);
597 goto finish;
598 }
599
600 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
601 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
602 pa_tagstruct_getu32(t, &fragsize) < 0 ||
603 pa_tagstruct_get_usec(t, &usec) < 0) {
604 pa_context_fail(c, PA_ERR_PROTOCOL);
605 goto finish;
606 }
607 } else {
608 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
609 pa_tagstruct_getu32(t, &tlength) < 0 ||
610 pa_tagstruct_getu32(t, &prebuf) < 0 ||
611 pa_tagstruct_getu32(t, &minreq) < 0 ||
612 pa_tagstruct_get_usec(t, &usec) < 0) {
613 pa_context_fail(c, PA_ERR_PROTOCOL);
614 goto finish;
615 }
616 }
617
618 if (!pa_tagstruct_eof(t)) {
619 pa_context_fail(c, PA_ERR_PROTOCOL);
620 goto finish;
621 }
622
623 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
624 goto finish;
625
626 if (s->state != PA_STREAM_READY)
627 goto finish;
628
629 if (s->direction == PA_STREAM_RECORD)
630 s->timing_info.configured_source_usec = usec;
631 else
632 s->timing_info.configured_sink_usec = usec;
633
634 s->buffer_attr.maxlength = maxlength;
635 s->buffer_attr.fragsize = fragsize;
636 s->buffer_attr.tlength = tlength;
637 s->buffer_attr.prebuf = prebuf;
638 s->buffer_attr.minreq = minreq;
639
640 request_auto_timing_update(s, TRUE);
641
642 if (s->buffer_attr_callback)
643 s->buffer_attr_callback(s, s->buffer_attr_userdata);
644
645 finish:
646 pa_context_unref(c);
647 }
648
649 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
650 pa_context *c = userdata;
651 pa_stream *s;
652 uint32_t channel;
653 pa_bool_t suspended;
654
655 pa_assert(pd);
656 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
657 pa_assert(t);
658 pa_assert(c);
659 pa_assert(PA_REFCNT_VALUE(c) >= 1);
660
661 pa_context_ref(c);
662
663 if (c->version < 12) {
664 pa_context_fail(c, PA_ERR_PROTOCOL);
665 goto finish;
666 }
667
668 if (pa_tagstruct_getu32(t, &channel) < 0 ||
669 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
670 !pa_tagstruct_eof(t)) {
671 pa_context_fail(c, PA_ERR_PROTOCOL);
672 goto finish;
673 }
674
675 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
676 goto finish;
677
678 if (s->state != PA_STREAM_READY)
679 goto finish;
680
681 s->suspended = suspended;
682
683 check_smoother_status(s, TRUE, FALSE, FALSE);
684 request_auto_timing_update(s, TRUE);
685
686 if (s->suspended_callback)
687 s->suspended_callback(s, s->suspended_userdata);
688
689 finish:
690 pa_context_unref(c);
691 }
692
693 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
694 pa_context *c = userdata;
695 pa_stream *s;
696 uint32_t channel;
697
698 pa_assert(pd);
699 pa_assert(command == PA_COMMAND_STARTED);
700 pa_assert(t);
701 pa_assert(c);
702 pa_assert(PA_REFCNT_VALUE(c) >= 1);
703
704 pa_context_ref(c);
705
706 if (c->version < 13) {
707 pa_context_fail(c, PA_ERR_PROTOCOL);
708 goto finish;
709 }
710
711 if (pa_tagstruct_getu32(t, &channel) < 0 ||
712 !pa_tagstruct_eof(t)) {
713 pa_context_fail(c, PA_ERR_PROTOCOL);
714 goto finish;
715 }
716
717 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
718 goto finish;
719
720 if (s->state != PA_STREAM_READY)
721 goto finish;
722
723 check_smoother_status(s, TRUE, TRUE, FALSE);
724 request_auto_timing_update(s, TRUE);
725
726 if (s->started_callback)
727 s->started_callback(s, s->started_userdata);
728
729 finish:
730 pa_context_unref(c);
731 }
732
733 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
734 pa_context *c = userdata;
735 pa_stream *s;
736 uint32_t channel;
737 pa_proplist *pl = NULL;
738 const char *event;
739
740 pa_assert(pd);
741 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
742 pa_assert(t);
743 pa_assert(c);
744 pa_assert(PA_REFCNT_VALUE(c) >= 1);
745
746 pa_context_ref(c);
747
748 if (c->version < 15) {
749 pa_context_fail(c, PA_ERR_PROTOCOL);
750 goto finish;
751 }
752
753 pl = pa_proplist_new();
754
755 if (pa_tagstruct_getu32(t, &channel) < 0 ||
756 pa_tagstruct_gets(t, &event) < 0 ||
757 pa_tagstruct_get_proplist(t, pl) < 0 ||
758 !pa_tagstruct_eof(t) || !event) {
759 pa_context_fail(c, PA_ERR_PROTOCOL);
760 goto finish;
761 }
762
763 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
764 goto finish;
765
766 if (s->state != PA_STREAM_READY)
767 goto finish;
768
769 if (s->event_callback)
770 s->event_callback(s, event, pl, s->event_userdata);
771
772 finish:
773 pa_context_unref(c);
774
775 if (pl)
776 pa_proplist_free(pl);
777 }
778
779 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
780 pa_stream *s;
781 pa_context *c = userdata;
782 uint32_t bytes, channel;
783
784 pa_assert(pd);
785 pa_assert(command == PA_COMMAND_REQUEST);
786 pa_assert(t);
787 pa_assert(c);
788 pa_assert(PA_REFCNT_VALUE(c) >= 1);
789
790 pa_context_ref(c);
791
792 if (pa_tagstruct_getu32(t, &channel) < 0 ||
793 pa_tagstruct_getu32(t, &bytes) < 0 ||
794 !pa_tagstruct_eof(t)) {
795 pa_context_fail(c, PA_ERR_PROTOCOL);
796 goto finish;
797 }
798
799 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
800 goto finish;
801
802 if (s->state != PA_STREAM_READY)
803 goto finish;
804
805 s->requested_bytes += bytes;
806
807 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
808
809 if (s->requested_bytes > 0 && s->write_callback)
810 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
811
812 finish:
813 pa_context_unref(c);
814 }
815
816 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
817 pa_stream *s;
818 pa_context *c = userdata;
819 uint32_t channel;
820
821 pa_assert(pd);
822 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
823 pa_assert(t);
824 pa_assert(c);
825 pa_assert(PA_REFCNT_VALUE(c) >= 1);
826
827 pa_context_ref(c);
828
829 if (pa_tagstruct_getu32(t, &channel) < 0 ||
830 !pa_tagstruct_eof(t)) {
831 pa_context_fail(c, PA_ERR_PROTOCOL);
832 goto finish;
833 }
834
835 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836 goto finish;
837
838 if (s->state != PA_STREAM_READY)
839 goto finish;
840
841 if (s->buffer_attr.prebuf > 0)
842 check_smoother_status(s, TRUE, FALSE, TRUE);
843
844 request_auto_timing_update(s, TRUE);
845
846 if (command == PA_COMMAND_OVERFLOW) {
847 if (s->overflow_callback)
848 s->overflow_callback(s, s->overflow_userdata);
849 } else if (command == PA_COMMAND_UNDERFLOW) {
850 if (s->underflow_callback)
851 s->underflow_callback(s, s->underflow_userdata);
852 }
853
854 finish:
855 pa_context_unref(c);
856 }
857
858 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
859 pa_assert(s);
860 pa_assert(PA_REFCNT_VALUE(s) >= 1);
861
862 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
863
864 if (s->state != PA_STREAM_READY)
865 return;
866
867 if (w) {
868 s->write_index_not_before = s->context->ctag;
869
870 if (s->timing_info_valid)
871 s->timing_info.write_index_corrupt = TRUE;
872
873 /* pa_log("write_index invalidated"); */
874 }
875
876 if (r) {
877 s->read_index_not_before = s->context->ctag;
878
879 if (s->timing_info_valid)
880 s->timing_info.read_index_corrupt = TRUE;
881
882 /* pa_log("read_index invalidated"); */
883 }
884
885 request_auto_timing_update(s, TRUE);
886 }
887
888 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
889 pa_stream *s = userdata;
890
891 pa_assert(s);
892 pa_assert(PA_REFCNT_VALUE(s) >= 1);
893
894 pa_stream_ref(s);
895 request_auto_timing_update(s, FALSE);
896 pa_stream_unref(s);
897 }
898
899 static void create_stream_complete(pa_stream *s) {
900 pa_assert(s);
901 pa_assert(PA_REFCNT_VALUE(s) >= 1);
902 pa_assert(s->state == PA_STREAM_CREATING);
903
904 pa_stream_set_state(s, PA_STREAM_READY);
905
906 if (s->requested_bytes > 0 && s->write_callback)
907 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
908
909 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
910 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
911 pa_assert(!s->auto_timing_update_event);
912 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
913
914 request_auto_timing_update(s, TRUE);
915 }
916
917 check_smoother_status(s, TRUE, FALSE, FALSE);
918 }
919
920 static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
921 const char *e;
922
923 pa_assert(s);
924 pa_assert(attr);
925
926 if ((e = getenv("PULSE_LATENCY_MSEC"))) {
927 uint32_t ms;
928
929 if (pa_atou(e, &ms) < 0 || ms <= 0)
930 pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
931 else {
932 attr->maxlength = (uint32_t) -1;
933 attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
934 attr->minreq = (uint32_t) -1;
935 attr->prebuf = (uint32_t) -1;
936 attr->fragsize = attr->tlength;
937 }
938
939 if (flags)
940 *flags |= PA_STREAM_ADJUST_LATENCY;
941 }
942
943 if (s->context->version >= 13)
944 return;
945
946 /* Version older than 0.9.10 didn't do server side buffer_attr
947 * selection, hence we have to fake it on the client side. */
948
949 /* We choose fairly conservative values here, to not confuse
950 * old clients with extremely large playback buffers */
951
952 if (attr->maxlength == (uint32_t) -1)
953 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
954
955 if (attr->tlength == (uint32_t) -1)
956 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
957
958 if (attr->minreq == (uint32_t) -1)
959 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
960
961 if (attr->prebuf == (uint32_t) -1)
962 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
963
964 if (attr->fragsize == (uint32_t) -1)
965 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
966 }
967
968 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
969 pa_stream *s = userdata;
970 uint32_t requested_bytes = 0;
971
972 pa_assert(pd);
973 pa_assert(s);
974 pa_assert(PA_REFCNT_VALUE(s) >= 1);
975 pa_assert(s->state == PA_STREAM_CREATING);
976
977 pa_stream_ref(s);
978
979 if (command != PA_COMMAND_REPLY) {
980 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
981 goto finish;
982
983 pa_stream_set_state(s, PA_STREAM_FAILED);
984 goto finish;
985 }
986
987 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
988 s->channel == PA_INVALID_INDEX ||
989 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
990 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
991 pa_context_fail(s->context, PA_ERR_PROTOCOL);
992 goto finish;
993 }
994
995 s->requested_bytes = (int64_t) requested_bytes;
996
997 if (s->context->version >= 9) {
998 if (s->direction == PA_STREAM_PLAYBACK) {
999 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1000 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1001 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1002 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1003 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1004 goto finish;
1005 }
1006 } else if (s->direction == PA_STREAM_RECORD) {
1007 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1008 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1009 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1010 goto finish;
1011 }
1012 }
1013 }
1014
1015 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1016 pa_sample_spec ss;
1017 pa_channel_map cm;
1018 const char *dn = NULL;
1019 pa_bool_t suspended;
1020
1021 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1022 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1023 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1024 pa_tagstruct_gets(t, &dn) < 0 ||
1025 pa_tagstruct_get_boolean(t, &suspended) < 0) {
1026 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1027 goto finish;
1028 }
1029
1030 if (!dn || s->device_index == PA_INVALID_INDEX ||
1031 ss.channels != cm.channels ||
1032 !pa_channel_map_valid(&cm) ||
1033 !pa_sample_spec_valid(&ss) ||
1034 (s->n_formats == 0 && (
1035 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1036 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1037 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1038 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1039 goto finish;
1040 }
1041
1042 pa_xfree(s->device_name);
1043 s->device_name = pa_xstrdup(dn);
1044 s->suspended = suspended;
1045
1046 s->channel_map = cm;
1047 s->sample_spec = ss;
1048 }
1049
1050 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1051 pa_usec_t usec;
1052
1053 if (pa_tagstruct_get_usec(t, &usec) < 0) {
1054 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1055 goto finish;
1056 }
1057
1058 if (s->direction == PA_STREAM_RECORD)
1059 s->timing_info.configured_source_usec = usec;
1060 else
1061 s->timing_info.configured_sink_usec = usec;
1062 }
1063
1064 if (s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK) {
1065 pa_format_info *f = pa_format_info_new();
1066 pa_tagstruct_get_format_info(t, f);
1067
1068 if (pa_format_info_valid(f))
1069 s->format = f;
1070 else {
1071 pa_format_info_free(f);
1072 if (s->n_formats > 0) {
1073 /* We used the extended API, so we should have got back a proper format */
1074 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1075 goto finish;
1076 }
1077 }
1078 }
1079
1080 if (!pa_tagstruct_eof(t)) {
1081 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1082 goto finish;
1083 }
1084
1085 if (s->direction == PA_STREAM_RECORD) {
1086 pa_assert(!s->record_memblockq);
1087
1088 s->record_memblockq = pa_memblockq_new(
1089 0,
1090 s->buffer_attr.maxlength,
1091 0,
1092 pa_frame_size(&s->sample_spec),
1093 1,
1094 0,
1095 0,
1096 NULL);
1097 }
1098
1099 s->channel_valid = TRUE;
1100 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1101
1102 create_stream_complete(s);
1103
1104 finish:
1105 pa_stream_unref(s);
1106 }
1107
1108 static int create_stream(
1109 pa_stream_direction_t direction,
1110 pa_stream *s,
1111 const char *dev,
1112 const pa_buffer_attr *attr,
1113 pa_stream_flags_t flags,
1114 const pa_cvolume *volume,
1115 pa_stream *sync_stream) {
1116
1117 pa_tagstruct *t;
1118 uint32_t tag;
1119 pa_bool_t volume_set = FALSE;
1120 uint32_t i;
1121
1122 pa_assert(s);
1123 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1124 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1125
1126 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1127 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1128 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1129 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1130 PA_STREAM_INTERPOLATE_TIMING|
1131 PA_STREAM_NOT_MONOTONIC|
1132 PA_STREAM_AUTO_TIMING_UPDATE|
1133 PA_STREAM_NO_REMAP_CHANNELS|
1134 PA_STREAM_NO_REMIX_CHANNELS|
1135 PA_STREAM_FIX_FORMAT|
1136 PA_STREAM_FIX_RATE|
1137 PA_STREAM_FIX_CHANNELS|
1138 PA_STREAM_DONT_MOVE|
1139 PA_STREAM_VARIABLE_RATE|
1140 PA_STREAM_PEAK_DETECT|
1141 PA_STREAM_START_MUTED|
1142 PA_STREAM_ADJUST_LATENCY|
1143 PA_STREAM_EARLY_REQUESTS|
1144 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1145 PA_STREAM_START_UNMUTED|
1146 PA_STREAM_FAIL_ON_SUSPEND|
1147 PA_STREAM_RELATIVE_VOLUME|
1148 PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1149
1150
1151 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1152 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1153 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1154 /* Althought some of the other flags are not supported on older
1155 * version, we don't check for them here, because it doesn't hurt
1156 * when they are passed but actually not supported. This makes
1157 * client development easier */
1158
1159 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1160 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1161 PA_CHECK_VALIDITY(s->context, !volume || (pa_sample_spec_valid(&s->sample_spec) && volume->channels == s->sample_spec.channels), PA_ERR_INVALID);
1162 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1163 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1164
1165 pa_stream_ref(s);
1166
1167 s->direction = direction;
1168
1169 if (sync_stream)
1170 s->syncid = sync_stream->syncid;
1171
1172 if (attr)
1173 s->buffer_attr = *attr;
1174 patch_buffer_attr(s, &s->buffer_attr, &flags);
1175
1176 s->flags = flags;
1177 s->corked = !!(flags & PA_STREAM_START_CORKED);
1178
1179 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1180 pa_usec_t x;
1181
1182 x = pa_rtclock_now();
1183
1184 pa_assert(!s->smoother);
1185 s->smoother = pa_smoother_new(
1186 SMOOTHER_ADJUST_TIME,
1187 SMOOTHER_HISTORY_TIME,
1188 !(flags & PA_STREAM_NOT_MONOTONIC),
1189 TRUE,
1190 SMOOTHER_MIN_HISTORY,
1191 x,
1192 TRUE);
1193 }
1194
1195 if (!dev)
1196 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1197
1198 t = pa_tagstruct_command(
1199 s->context,
1200 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1201 &tag);
1202
1203 if (s->context->version < 13)
1204 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1205
1206 pa_tagstruct_put(
1207 t,
1208 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1209 PA_TAG_CHANNEL_MAP, &s->channel_map,
1210 PA_TAG_U32, PA_INVALID_INDEX,
1211 PA_TAG_STRING, dev,
1212 PA_TAG_U32, s->buffer_attr.maxlength,
1213 PA_TAG_BOOLEAN, s->corked,
1214 PA_TAG_INVALID);
1215
1216 if (s->direction == PA_STREAM_PLAYBACK) {
1217 pa_cvolume cv;
1218
1219 pa_tagstruct_put(
1220 t,
1221 PA_TAG_U32, s->buffer_attr.tlength,
1222 PA_TAG_U32, s->buffer_attr.prebuf,
1223 PA_TAG_U32, s->buffer_attr.minreq,
1224 PA_TAG_U32, s->syncid,
1225 PA_TAG_INVALID);
1226
1227 volume_set = !!volume;
1228
1229 if (!volume) {
1230 if (pa_sample_spec_valid(&s->sample_spec))
1231 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1232 else {
1233 /* This is not really relevant, since no volume was set, and
1234 * the real number of channels is embedded in the format_info
1235 * structure */
1236 volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1237 }
1238 }
1239
1240 pa_tagstruct_put_cvolume(t, volume);
1241 } else
1242 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1243
1244 if (s->context->version >= 12) {
1245 pa_tagstruct_put(
1246 t,
1247 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1248 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1249 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1250 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1251 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1252 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1253 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1254 PA_TAG_INVALID);
1255 }
1256
1257 if (s->context->version >= 13) {
1258
1259 if (s->direction == PA_STREAM_PLAYBACK)
1260 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1261 else
1262 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1263
1264 pa_tagstruct_put(
1265 t,
1266 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1267 PA_TAG_PROPLIST, s->proplist,
1268 PA_TAG_INVALID);
1269
1270 if (s->direction == PA_STREAM_RECORD)
1271 pa_tagstruct_putu32(t, s->direct_on_input);
1272 }
1273
1274 if (s->context->version >= 14) {
1275
1276 if (s->direction == PA_STREAM_PLAYBACK)
1277 pa_tagstruct_put_boolean(t, volume_set);
1278
1279 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1280 }
1281
1282 if (s->context->version >= 15) {
1283
1284 if (s->direction == PA_STREAM_PLAYBACK)
1285 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1286
1287 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1288 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1289 }
1290
1291 if (s->context->version >= 17) {
1292
1293 if (s->direction == PA_STREAM_PLAYBACK)
1294 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1295
1296 }
1297
1298 if (s->context->version >= 18) {
1299
1300 if (s->direction == PA_STREAM_PLAYBACK)
1301 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1302 }
1303
1304 if (s->context->version >= 21) {
1305
1306 if (s->direction == PA_STREAM_PLAYBACK) {
1307 pa_tagstruct_putu8(t, s->n_formats);
1308 for (i = 0; i < s->n_formats; i++)
1309 pa_tagstruct_put_format_info(t, s->req_formats[i]);
1310 }
1311 }
1312
1313 pa_pstream_send_tagstruct(s->context->pstream, t);
1314 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1315
1316 pa_stream_set_state(s, PA_STREAM_CREATING);
1317
1318 pa_stream_unref(s);
1319 return 0;
1320 }
1321
1322 int pa_stream_connect_playback(
1323 pa_stream *s,
1324 const char *dev,
1325 const pa_buffer_attr *attr,
1326 pa_stream_flags_t flags,
1327 const pa_cvolume *volume,
1328 pa_stream *sync_stream) {
1329
1330 pa_assert(s);
1331 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1332
1333 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1334 }
1335
1336 int pa_stream_connect_record(
1337 pa_stream *s,
1338 const char *dev,
1339 const pa_buffer_attr *attr,
1340 pa_stream_flags_t flags) {
1341
1342 pa_assert(s);
1343 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1344
1345 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1346 }
1347
1348 int pa_stream_begin_write(
1349 pa_stream *s,
1350 void **data,
1351 size_t *nbytes) {
1352
1353 pa_assert(s);
1354 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1355
1356 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1357 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1358 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1359 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1360 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1361
1362 if (*nbytes != (size_t) -1) {
1363 size_t m, fs;
1364
1365 m = pa_mempool_block_size_max(s->context->mempool);
1366 fs = pa_frame_size(&s->sample_spec);
1367
1368 m = (m / fs) * fs;
1369 if (*nbytes > m)
1370 *nbytes = m;
1371 }
1372
1373 if (!s->write_memblock) {
1374 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1375 s->write_data = pa_memblock_acquire(s->write_memblock);
1376 }
1377
1378 *data = s->write_data;
1379 *nbytes = pa_memblock_get_length(s->write_memblock);
1380
1381 return 0;
1382 }
1383
1384 int pa_stream_cancel_write(
1385 pa_stream *s) {
1386
1387 pa_assert(s);
1388 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1389
1390 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1391 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1392 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1393 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1394
1395 pa_assert(s->write_data);
1396
1397 pa_memblock_release(s->write_memblock);
1398 pa_memblock_unref(s->write_memblock);
1399 s->write_memblock = NULL;
1400 s->write_data = NULL;
1401
1402 return 0;
1403 }
1404
1405 int pa_stream_write(
1406 pa_stream *s,
1407 const void *data,
1408 size_t length,
1409 pa_free_cb_t free_cb,
1410 int64_t offset,
1411 pa_seek_mode_t seek) {
1412
1413 pa_assert(s);
1414 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1415 pa_assert(data);
1416
1417 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1418 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1419 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1420 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1421 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1422 PA_CHECK_VALIDITY(s->context,
1423 !s->write_memblock ||
1424 ((data >= s->write_data) &&
1425 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1426 PA_ERR_INVALID);
1427 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1428
1429 if (s->write_memblock) {
1430 pa_memchunk chunk;
1431
1432 /* pa_stream_write_begin() was called before */
1433
1434 pa_memblock_release(s->write_memblock);
1435
1436 chunk.memblock = s->write_memblock;
1437 chunk.index = (const char *) data - (const char *) s->write_data;
1438 chunk.length = length;
1439
1440 s->write_memblock = NULL;
1441 s->write_data = NULL;
1442
1443 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1444 pa_memblock_unref(chunk.memblock);
1445
1446 } else {
1447 pa_seek_mode_t t_seek = seek;
1448 int64_t t_offset = offset;
1449 size_t t_length = length;
1450 const void *t_data = data;
1451
1452 /* pa_stream_write_begin() was not called before */
1453
1454 while (t_length > 0) {
1455 pa_memchunk chunk;
1456
1457 chunk.index = 0;
1458
1459 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1460 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1461 chunk.length = t_length;
1462 } else {
1463 void *d;
1464
1465 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1466 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1467
1468 d = pa_memblock_acquire(chunk.memblock);
1469 memcpy(d, t_data, chunk.length);
1470 pa_memblock_release(chunk.memblock);
1471 }
1472
1473 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1474
1475 t_offset = 0;
1476 t_seek = PA_SEEK_RELATIVE;
1477
1478 t_data = (const uint8_t*) t_data + chunk.length;
1479 t_length -= chunk.length;
1480
1481 pa_memblock_unref(chunk.memblock);
1482 }
1483
1484 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1485 free_cb((void*) data);
1486 }
1487
1488 /* This is obviously wrong since we ignore the seeking index . But
1489 * that's OK, the server side applies the same error */
1490 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1491
1492 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1493
1494 if (s->direction == PA_STREAM_PLAYBACK) {
1495
1496 /* Update latency request correction */
1497 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1498
1499 if (seek == PA_SEEK_ABSOLUTE) {
1500 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1501 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1502 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1503 } else if (seek == PA_SEEK_RELATIVE) {
1504 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1505 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1506 } else
1507 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1508 }
1509
1510 /* Update the write index in the already available latency data */
1511 if (s->timing_info_valid) {
1512
1513 if (seek == PA_SEEK_ABSOLUTE) {
1514 s->timing_info.write_index_corrupt = FALSE;
1515 s->timing_info.write_index = offset + (int64_t) length;
1516 } else if (seek == PA_SEEK_RELATIVE) {
1517 if (!s->timing_info.write_index_corrupt)
1518 s->timing_info.write_index += offset + (int64_t) length;
1519 } else
1520 s->timing_info.write_index_corrupt = TRUE;
1521 }
1522
1523 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1524 request_auto_timing_update(s, TRUE);
1525 }
1526
1527 return 0;
1528 }
1529
1530 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1531 pa_assert(s);
1532 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1533 pa_assert(data);
1534 pa_assert(length);
1535
1536 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1537 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1538 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1539
1540 if (!s->peek_memchunk.memblock) {
1541
1542 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1543 *data = NULL;
1544 *length = 0;
1545 return 0;
1546 }
1547
1548 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1549 }
1550
1551 pa_assert(s->peek_data);
1552 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1553 *length = s->peek_memchunk.length;
1554 return 0;
1555 }
1556
1557 int pa_stream_drop(pa_stream *s) {
1558 pa_assert(s);
1559 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1560
1561 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1562 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1563 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1564 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1565
1566 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1567
1568 /* Fix the simulated local read index */
1569 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1570 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1571
1572 pa_assert(s->peek_data);
1573 pa_memblock_release(s->peek_memchunk.memblock);
1574 pa_memblock_unref(s->peek_memchunk.memblock);
1575 pa_memchunk_reset(&s->peek_memchunk);
1576
1577 return 0;
1578 }
1579
1580 size_t pa_stream_writable_size(pa_stream *s) {
1581 pa_assert(s);
1582 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1583
1584 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1585 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1586 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1587
1588 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1589 }
1590
1591 size_t pa_stream_readable_size(pa_stream *s) {
1592 pa_assert(s);
1593 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1594
1595 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1596 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1597 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1598
1599 return pa_memblockq_get_length(s->record_memblockq);
1600 }
1601
1602 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1603 pa_operation *o;
1604 pa_tagstruct *t;
1605 uint32_t tag;
1606
1607 pa_assert(s);
1608 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1609
1610 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1611 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1612 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1613
1614 /* Ask for a timing update before we cork/uncork to get the best
1615 * accuracy for the transport latency suitable for the
1616 * check_smoother_status() call in the started callback */
1617 request_auto_timing_update(s, TRUE);
1618
1619 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1620
1621 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1622 pa_tagstruct_putu32(t, s->channel);
1623 pa_pstream_send_tagstruct(s->context->pstream, t);
1624 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1625
1626 /* This might cause the read index to conitnue again, hence
1627 * let's request a timing update */
1628 request_auto_timing_update(s, TRUE);
1629
1630 return o;
1631 }
1632
1633 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1634 pa_usec_t usec;
1635
1636 pa_assert(s);
1637 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1638 pa_assert(s->state == PA_STREAM_READY);
1639 pa_assert(s->direction != PA_STREAM_UPLOAD);
1640 pa_assert(s->timing_info_valid);
1641 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1642 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1643
1644 if (s->direction == PA_STREAM_PLAYBACK) {
1645 /* The last byte that was written into the output device
1646 * had this time value associated */
1647 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1648
1649 if (!s->corked && !s->suspended) {
1650
1651 if (!ignore_transport)
1652 /* Because the latency info took a little time to come
1653 * to us, we assume that the real output time is actually
1654 * a little ahead */
1655 usec += s->timing_info.transport_usec;
1656
1657 /* However, the output device usually maintains a buffer
1658 too, hence the real sample currently played is a little
1659 back */
1660 if (s->timing_info.sink_usec >= usec)
1661 usec = 0;
1662 else
1663 usec -= s->timing_info.sink_usec;
1664 }
1665
1666 } else {
1667 pa_assert(s->direction == PA_STREAM_RECORD);
1668
1669 /* The last byte written into the server side queue had
1670 * this time value associated */
1671 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1672
1673 if (!s->corked && !s->suspended) {
1674
1675 if (!ignore_transport)
1676 /* Add transport latency */
1677 usec += s->timing_info.transport_usec;
1678
1679 /* Add latency of data in device buffer */
1680 usec += s->timing_info.source_usec;
1681
1682 /* If this is a monitor source, we need to correct the
1683 * time by the playback device buffer */
1684 if (s->timing_info.sink_usec >= usec)
1685 usec = 0;
1686 else
1687 usec -= s->timing_info.sink_usec;
1688 }
1689 }
1690
1691 return usec;
1692 }
1693
1694 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1695 pa_operation *o = userdata;
1696 struct timeval local, remote, now;
1697 pa_timing_info *i;
1698 pa_bool_t playing = FALSE;
1699 uint64_t underrun_for = 0, playing_for = 0;
1700
1701 pa_assert(pd);
1702 pa_assert(o);
1703 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1704
1705 if (!o->context || !o->stream)
1706 goto finish;
1707
1708 i = &o->stream->timing_info;
1709
1710 o->stream->timing_info_valid = FALSE;
1711 i->write_index_corrupt = TRUE;
1712 i->read_index_corrupt = TRUE;
1713
1714 if (command != PA_COMMAND_REPLY) {
1715 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1716 goto finish;
1717
1718 } else {
1719
1720 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1721 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1722 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1723 pa_tagstruct_get_timeval(t, &local) < 0 ||
1724 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1725 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1726 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1727
1728 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1729 goto finish;
1730 }
1731
1732 if (o->context->version >= 13 &&
1733 o->stream->direction == PA_STREAM_PLAYBACK)
1734 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1735 pa_tagstruct_getu64(t, &playing_for) < 0) {
1736
1737 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1738 goto finish;
1739 }
1740
1741
1742 if (!pa_tagstruct_eof(t)) {
1743 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1744 goto finish;
1745 }
1746 o->stream->timing_info_valid = TRUE;
1747 i->write_index_corrupt = FALSE;
1748 i->read_index_corrupt = FALSE;
1749
1750 i->playing = (int) playing;
1751 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1752
1753 pa_gettimeofday(&now);
1754
1755 /* Calculcate timestamps */
1756 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1757 /* local and remote seem to have synchronized clocks */
1758
1759 if (o->stream->direction == PA_STREAM_PLAYBACK)
1760 i->transport_usec = pa_timeval_diff(&remote, &local);
1761 else
1762 i->transport_usec = pa_timeval_diff(&now, &remote);
1763
1764 i->synchronized_clocks = TRUE;
1765 i->timestamp = remote;
1766 } else {
1767 /* clocks are not synchronized, let's estimate latency then */
1768 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1769 i->synchronized_clocks = FALSE;
1770 i->timestamp = local;
1771 pa_timeval_add(&i->timestamp, i->transport_usec);
1772 }
1773
1774 /* Invalidate read and write indexes if necessary */
1775 if (tag < o->stream->read_index_not_before)
1776 i->read_index_corrupt = TRUE;
1777
1778 if (tag < o->stream->write_index_not_before)
1779 i->write_index_corrupt = TRUE;
1780
1781 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1782 /* Write index correction */
1783
1784 int n, j;
1785 uint32_t ctag = tag;
1786
1787 /* Go through the saved correction values and add up the
1788 * total correction.*/
1789 for (n = 0, j = o->stream->current_write_index_correction+1;
1790 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1791 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1792
1793 /* Step over invalid data or out-of-date data */
1794 if (!o->stream->write_index_corrections[j].valid ||
1795 o->stream->write_index_corrections[j].tag < ctag)
1796 continue;
1797
1798 /* Make sure that everything is in order */
1799 ctag = o->stream->write_index_corrections[j].tag+1;
1800
1801 /* Now fix the write index */
1802 if (o->stream->write_index_corrections[j].corrupt) {
1803 /* A corrupting seek was made */
1804 i->write_index_corrupt = TRUE;
1805 } else if (o->stream->write_index_corrections[j].absolute) {
1806 /* An absolute seek was made */
1807 i->write_index = o->stream->write_index_corrections[j].value;
1808 i->write_index_corrupt = FALSE;
1809 } else if (!i->write_index_corrupt) {
1810 /* A relative seek was made */
1811 i->write_index += o->stream->write_index_corrections[j].value;
1812 }
1813 }
1814
1815 /* Clear old correction entries */
1816 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1817 if (!o->stream->write_index_corrections[n].valid)
1818 continue;
1819
1820 if (o->stream->write_index_corrections[n].tag <= tag)
1821 o->stream->write_index_corrections[n].valid = FALSE;
1822 }
1823 }
1824
1825 if (o->stream->direction == PA_STREAM_RECORD) {
1826 /* Read index correction */
1827
1828 if (!i->read_index_corrupt)
1829 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1830 }
1831
1832 /* Update smoother if we're not corked */
1833 if (o->stream->smoother && !o->stream->corked) {
1834 pa_usec_t u, x;
1835
1836 u = x = pa_rtclock_now() - i->transport_usec;
1837
1838 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1839 pa_usec_t su;
1840
1841 /* If we weren't playing then it will take some time
1842 * until the audio will actually come out through the
1843 * speakers. Since we follow that timing here, we need
1844 * to try to fix this up */
1845
1846 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1847
1848 if (su < i->sink_usec)
1849 x += i->sink_usec - su;
1850 }
1851
1852 if (!i->playing)
1853 pa_smoother_pause(o->stream->smoother, x);
1854
1855 /* Update the smoother */
1856 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1857 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1858 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1859
1860 if (i->playing)
1861 pa_smoother_resume(o->stream->smoother, x, TRUE);
1862 }
1863 }
1864
1865 o->stream->auto_timing_update_requested = FALSE;
1866
1867 if (o->stream->latency_update_callback)
1868 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1869
1870 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1871 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1872 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1873 }
1874
1875 finish:
1876
1877 pa_operation_done(o);
1878 pa_operation_unref(o);
1879 }
1880
1881 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1882 uint32_t tag;
1883 pa_operation *o;
1884 pa_tagstruct *t;
1885 struct timeval now;
1886 int cidx = 0;
1887
1888 pa_assert(s);
1889 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1890
1891 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1892 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1893 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1894
1895 if (s->direction == PA_STREAM_PLAYBACK) {
1896 /* Find a place to store the write_index correction data for this entry */
1897 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1898
1899 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1900 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1901 }
1902 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1903
1904 t = pa_tagstruct_command(
1905 s->context,
1906 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1907 &tag);
1908 pa_tagstruct_putu32(t, s->channel);
1909 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1910
1911 pa_pstream_send_tagstruct(s->context->pstream, t);
1912 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1913
1914 if (s->direction == PA_STREAM_PLAYBACK) {
1915 /* Fill in initial correction data */
1916
1917 s->current_write_index_correction = cidx;
1918
1919 s->write_index_corrections[cidx].valid = TRUE;
1920 s->write_index_corrections[cidx].absolute = FALSE;
1921 s->write_index_corrections[cidx].corrupt = FALSE;
1922 s->write_index_corrections[cidx].tag = tag;
1923 s->write_index_corrections[cidx].value = 0;
1924 }
1925
1926 return o;
1927 }
1928
1929 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1930 pa_stream *s = userdata;
1931
1932 pa_assert(pd);
1933 pa_assert(s);
1934 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1935
1936 pa_stream_ref(s);
1937
1938 if (command != PA_COMMAND_REPLY) {
1939 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1940 goto finish;
1941
1942 pa_stream_set_state(s, PA_STREAM_FAILED);
1943 goto finish;
1944 } else if (!pa_tagstruct_eof(t)) {
1945 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1946 goto finish;
1947 }
1948
1949 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1950
1951 finish:
1952 pa_stream_unref(s);
1953 }
1954
1955 int pa_stream_disconnect(pa_stream *s) {
1956 pa_tagstruct *t;
1957 uint32_t tag;
1958
1959 pa_assert(s);
1960 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1961
1962 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1963 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1964 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1965
1966 pa_stream_ref(s);
1967
1968 t = pa_tagstruct_command(
1969 s->context,
1970 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1971 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1972 &tag);
1973 pa_tagstruct_putu32(t, s->channel);
1974 pa_pstream_send_tagstruct(s->context->pstream, t);
1975 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1976
1977 pa_stream_unref(s);
1978 return 0;
1979 }
1980
1981 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1982 pa_assert(s);
1983 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1984
1985 if (pa_detect_fork())
1986 return;
1987
1988 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1989 return;
1990
1991 s->read_callback = cb;
1992 s->read_userdata = userdata;
1993 }
1994
1995 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1996 pa_assert(s);
1997 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1998
1999 if (pa_detect_fork())
2000 return;
2001
2002 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2003 return;
2004
2005 s->write_callback = cb;
2006 s->write_userdata = userdata;
2007 }
2008
2009 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2010 pa_assert(s);
2011 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2012
2013 if (pa_detect_fork())
2014 return;
2015
2016 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2017 return;
2018
2019 s->state_callback = cb;
2020 s->state_userdata = userdata;
2021 }
2022
2023 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2024 pa_assert(s);
2025 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2026
2027 if (pa_detect_fork())
2028 return;
2029
2030 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2031 return;
2032
2033 s->overflow_callback = cb;
2034 s->overflow_userdata = userdata;
2035 }
2036
2037 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2038 pa_assert(s);
2039 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2040
2041 if (pa_detect_fork())
2042 return;
2043
2044 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2045 return;
2046
2047 s->underflow_callback = cb;
2048 s->underflow_userdata = userdata;
2049 }
2050
2051 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2052 pa_assert(s);
2053 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2054
2055 if (pa_detect_fork())
2056 return;
2057
2058 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2059 return;
2060
2061 s->latency_update_callback = cb;
2062 s->latency_update_userdata = userdata;
2063 }
2064
2065 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2066 pa_assert(s);
2067 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2068
2069 if (pa_detect_fork())
2070 return;
2071
2072 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2073 return;
2074
2075 s->moved_callback = cb;
2076 s->moved_userdata = userdata;
2077 }
2078
2079 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2080 pa_assert(s);
2081 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2082
2083 if (pa_detect_fork())
2084 return;
2085
2086 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2087 return;
2088
2089 s->suspended_callback = cb;
2090 s->suspended_userdata = userdata;
2091 }
2092
2093 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2094 pa_assert(s);
2095 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2096
2097 if (pa_detect_fork())
2098 return;
2099
2100 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2101 return;
2102
2103 s->started_callback = cb;
2104 s->started_userdata = userdata;
2105 }
2106
2107 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2108 pa_assert(s);
2109 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2110
2111 if (pa_detect_fork())
2112 return;
2113
2114 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2115 return;
2116
2117 s->event_callback = cb;
2118 s->event_userdata = userdata;
2119 }
2120
2121 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2122 pa_assert(s);
2123 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2124
2125 if (pa_detect_fork())
2126 return;
2127
2128 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2129 return;
2130
2131 s->buffer_attr_callback = cb;
2132 s->buffer_attr_userdata = userdata;
2133 }
2134
2135 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2136 pa_operation *o = userdata;
2137 int success = 1;
2138
2139 pa_assert(pd);
2140 pa_assert(o);
2141 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2142
2143 if (!o->context)
2144 goto finish;
2145
2146 if (command != PA_COMMAND_REPLY) {
2147 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2148 goto finish;
2149
2150 success = 0;
2151 } else if (!pa_tagstruct_eof(t)) {
2152 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2153 goto finish;
2154 }
2155
2156 if (o->callback) {
2157 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2158 cb(o->stream, success, o->userdata);
2159 }
2160
2161 finish:
2162 pa_operation_done(o);
2163 pa_operation_unref(o);
2164 }
2165
2166 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2167 pa_operation *o;
2168 pa_tagstruct *t;
2169 uint32_t tag;
2170
2171 pa_assert(s);
2172 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2173
2174 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2175 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2176 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2177
2178 /* Ask for a timing update before we cork/uncork to get the best
2179 * accuracy for the transport latency suitable for the
2180 * check_smoother_status() call in the started callback */
2181 request_auto_timing_update(s, TRUE);
2182
2183 s->corked = b;
2184
2185 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2186
2187 t = pa_tagstruct_command(
2188 s->context,
2189 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2190 &tag);
2191 pa_tagstruct_putu32(t, s->channel);
2192 pa_tagstruct_put_boolean(t, !!b);
2193 pa_pstream_send_tagstruct(s->context->pstream, t);
2194 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2195
2196 check_smoother_status(s, FALSE, FALSE, FALSE);
2197
2198 /* This might cause the indexes to hang/start again, hence let's
2199 * request a timing update, after the cork/uncork, too */
2200 request_auto_timing_update(s, TRUE);
2201
2202 return o;
2203 }
2204
2205 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2206 pa_tagstruct *t;
2207 pa_operation *o;
2208 uint32_t tag;
2209
2210 pa_assert(s);
2211 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212
2213 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2214 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2215
2216 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2217
2218 t = pa_tagstruct_command(s->context, command, &tag);
2219 pa_tagstruct_putu32(t, s->channel);
2220 pa_pstream_send_tagstruct(s->context->pstream, t);
2221 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2222
2223 return o;
2224 }
2225
2226 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2227 pa_operation *o;
2228
2229 pa_assert(s);
2230 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2231
2232 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2233 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2234 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2235
2236 /* Ask for a timing update *before* the flush, so that the
2237 * transport usec is as up to date as possible when we get the
2238 * underflow message and update the smoother status*/
2239 request_auto_timing_update(s, TRUE);
2240
2241 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2242 return NULL;
2243
2244 if (s->direction == PA_STREAM_PLAYBACK) {
2245
2246 if (s->write_index_corrections[s->current_write_index_correction].valid)
2247 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2248
2249 if (s->buffer_attr.prebuf > 0)
2250 check_smoother_status(s, FALSE, FALSE, TRUE);
2251
2252 /* This will change the write index, but leave the
2253 * read index untouched. */
2254 invalidate_indexes(s, FALSE, TRUE);
2255
2256 } else
2257 /* For record streams this has no influence on the write
2258 * index, but the read index might jump. */
2259 invalidate_indexes(s, TRUE, FALSE);
2260
2261 /* Note that we do not update requested_bytes here. This is
2262 * because we cannot really know how data actually was dropped
2263 * from the write index due to this. This 'error' will be applied
2264 * by both client and server and hence we should be fine. */
2265
2266 return o;
2267 }
2268
2269 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2270 pa_operation *o;
2271
2272 pa_assert(s);
2273 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2274
2275 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2276 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2277 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2278 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2279
2280 /* Ask for a timing update before we cork/uncork to get the best
2281 * accuracy for the transport latency suitable for the
2282 * check_smoother_status() call in the started callback */
2283 request_auto_timing_update(s, TRUE);
2284
2285 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2286 return NULL;
2287
2288 /* This might cause the read index to hang again, hence
2289 * let's request a timing update */
2290 request_auto_timing_update(s, TRUE);
2291
2292 return o;
2293 }
2294
2295 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2296 pa_operation *o;
2297
2298 pa_assert(s);
2299 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2300
2301 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2302 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2304 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2305
2306 /* Ask for a timing update before we cork/uncork to get the best
2307 * accuracy for the transport latency suitable for the
2308 * check_smoother_status() call in the started callback */
2309 request_auto_timing_update(s, TRUE);
2310
2311 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2312 return NULL;
2313
2314 /* This might cause the read index to start moving again, hence
2315 * let's request a timing update */
2316 request_auto_timing_update(s, TRUE);
2317
2318 return o;
2319 }
2320
2321 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2322 pa_operation *o;
2323
2324 pa_assert(s);
2325 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2326 pa_assert(name);
2327
2328 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2329 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2331
2332 if (s->context->version >= 13) {
2333 pa_proplist *p = pa_proplist_new();
2334
2335 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2336 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2337 pa_proplist_free(p);
2338 } else {
2339 pa_tagstruct *t;
2340 uint32_t tag;
2341
2342 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2343 t = pa_tagstruct_command(
2344 s->context,
2345 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2346 &tag);
2347 pa_tagstruct_putu32(t, s->channel);
2348 pa_tagstruct_puts(t, name);
2349 pa_pstream_send_tagstruct(s->context->pstream, t);
2350 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2351 }
2352
2353 return o;
2354 }
2355
2356 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2357 pa_usec_t usec;
2358
2359 pa_assert(s);
2360 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2361
2362 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2363 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2364 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2365 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2366 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2367 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2368
2369 if (s->smoother)
2370 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2371 else
2372 usec = calc_time(s, FALSE);
2373
2374 /* Make sure the time runs monotonically */
2375 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2376 if (usec < s->previous_time)
2377 usec = s->previous_time;
2378 else
2379 s->previous_time = usec;
2380 }
2381
2382 if (r_usec)
2383 *r_usec = usec;
2384
2385 return 0;
2386 }
2387
2388 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2389 pa_assert(s);
2390 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2391
2392 if (negative)
2393 *negative = 0;
2394
2395 if (a >= b)
2396 return a-b;
2397 else {
2398 if (negative && s->direction == PA_STREAM_RECORD) {
2399 *negative = 1;
2400 return b-a;
2401 } else
2402 return 0;
2403 }
2404 }
2405
2406 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2407 pa_usec_t t, c;
2408 int r;
2409 int64_t cindex;
2410
2411 pa_assert(s);
2412 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2413 pa_assert(r_usec);
2414
2415 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2418 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2419 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2420 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2421
2422 if ((r = pa_stream_get_time(s, &t)) < 0)
2423 return r;
2424
2425 if (s->direction == PA_STREAM_PLAYBACK)
2426 cindex = s->timing_info.write_index;
2427 else
2428 cindex = s->timing_info.read_index;
2429
2430 if (cindex < 0)
2431 cindex = 0;
2432
2433 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2434
2435 if (s->direction == PA_STREAM_PLAYBACK)
2436 *r_usec = time_counter_diff(s, c, t, negative);
2437 else
2438 *r_usec = time_counter_diff(s, t, c, negative);
2439
2440 return 0;
2441 }
2442
2443 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2444 pa_assert(s);
2445 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2446
2447 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2448 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2449 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2450 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2451
2452 return &s->timing_info;
2453 }
2454
2455 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2456 pa_assert(s);
2457 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2458
2459 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2460
2461 return &s->sample_spec;
2462 }
2463
2464 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2465 pa_assert(s);
2466 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2467
2468 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2469
2470 return &s->channel_map;
2471 }
2472
2473 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2474 pa_assert(s);
2475 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2476
2477 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2478 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2479 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2480
2481 return &s->buffer_attr;
2482 }
2483
2484 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2485 pa_operation *o = userdata;
2486 int success = 1;
2487
2488 pa_assert(pd);
2489 pa_assert(o);
2490 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2491
2492 if (!o->context)
2493 goto finish;
2494
2495 if (command != PA_COMMAND_REPLY) {
2496 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2497 goto finish;
2498
2499 success = 0;
2500 } else {
2501 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2502 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2503 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2504 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2505 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2506 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2507 goto finish;
2508 }
2509 } else if (o->stream->direction == PA_STREAM_RECORD) {
2510 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2511 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2512 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2513 goto finish;
2514 }
2515 }
2516
2517 if (o->stream->context->version >= 13) {
2518 pa_usec_t usec;
2519
2520 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2521 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2522 goto finish;
2523 }
2524
2525 if (o->stream->direction == PA_STREAM_RECORD)
2526 o->stream->timing_info.configured_source_usec = usec;
2527 else
2528 o->stream->timing_info.configured_sink_usec = usec;
2529 }
2530
2531 if (!pa_tagstruct_eof(t)) {
2532 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2533 goto finish;
2534 }
2535 }
2536
2537 if (o->callback) {
2538 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2539 cb(o->stream, success, o->userdata);
2540 }
2541
2542 finish:
2543 pa_operation_done(o);
2544 pa_operation_unref(o);
2545 }
2546
2547
2548 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2549 pa_operation *o;
2550 pa_tagstruct *t;
2551 uint32_t tag;
2552 pa_buffer_attr copy;
2553
2554 pa_assert(s);
2555 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2556 pa_assert(attr);
2557
2558 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2559 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2560 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2561 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2562
2563 /* Ask for a timing update before we cork/uncork to get the best
2564 * accuracy for the transport latency suitable for the
2565 * check_smoother_status() call in the started callback */
2566 request_auto_timing_update(s, TRUE);
2567
2568 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2569
2570 t = pa_tagstruct_command(
2571 s->context,
2572 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2573 &tag);
2574 pa_tagstruct_putu32(t, s->channel);
2575
2576 copy = *attr;
2577 patch_buffer_attr(s, &copy, NULL);
2578 attr = &copy;
2579
2580 pa_tagstruct_putu32(t, attr->maxlength);
2581
2582 if (s->direction == PA_STREAM_PLAYBACK)
2583 pa_tagstruct_put(
2584 t,
2585 PA_TAG_U32, attr->tlength,
2586 PA_TAG_U32, attr->prebuf,
2587 PA_TAG_U32, attr->minreq,
2588 PA_TAG_INVALID);
2589 else
2590 pa_tagstruct_putu32(t, attr->fragsize);
2591
2592 if (s->context->version >= 13)
2593 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2594
2595 if (s->context->version >= 14)
2596 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2597
2598 pa_pstream_send_tagstruct(s->context->pstream, t);
2599 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2600
2601 /* This might cause changes in the read/write indexex, hence let's
2602 * request a timing update */
2603 request_auto_timing_update(s, TRUE);
2604
2605 return o;
2606 }
2607
2608 uint32_t pa_stream_get_device_index(pa_stream *s) {
2609 pa_assert(s);
2610 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2611
2612 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2613 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2614 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2615 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2616 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2617
2618 return s->device_index;
2619 }
2620
2621 const char *pa_stream_get_device_name(pa_stream *s) {
2622 pa_assert(s);
2623 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2624
2625 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2626 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2627 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2628 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2629 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2630
2631 return s->device_name;
2632 }
2633
2634 int pa_stream_is_suspended(pa_stream *s) {
2635 pa_assert(s);
2636 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2637
2638 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2639 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2640 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2641 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2642
2643 return s->suspended;
2644 }
2645
2646 int pa_stream_is_corked(pa_stream *s) {
2647 pa_assert(s);
2648 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2649
2650 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2651 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2652 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2653
2654 return s->corked;
2655 }
2656
2657 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2658 pa_operation *o = userdata;
2659 int success = 1;
2660
2661 pa_assert(pd);
2662 pa_assert(o);
2663 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2664
2665 if (!o->context)
2666 goto finish;
2667
2668 if (command != PA_COMMAND_REPLY) {
2669 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2670 goto finish;
2671
2672 success = 0;
2673 } else {
2674
2675 if (!pa_tagstruct_eof(t)) {
2676 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2677 goto finish;
2678 }
2679 }
2680
2681 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2682 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2683
2684 if (o->callback) {
2685 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2686 cb(o->stream, success, o->userdata);
2687 }
2688
2689 finish:
2690 pa_operation_done(o);
2691 pa_operation_unref(o);
2692 }
2693
2694
2695 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2696 pa_operation *o;
2697 pa_tagstruct *t;
2698 uint32_t tag;
2699
2700 pa_assert(s);
2701 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2702
2703 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2704 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2705 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2706 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2707 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2708 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2709
2710 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2711 o->private = PA_UINT_TO_PTR(rate);
2712
2713 t = pa_tagstruct_command(
2714 s->context,
2715 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2716 &tag);
2717 pa_tagstruct_putu32(t, s->channel);
2718 pa_tagstruct_putu32(t, rate);
2719
2720 pa_pstream_send_tagstruct(s->context->pstream, t);
2721 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2722
2723 return o;
2724 }
2725
2726 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2727 pa_operation *o;
2728 pa_tagstruct *t;
2729 uint32_t tag;
2730
2731 pa_assert(s);
2732 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2733
2734 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2736 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2737 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2738 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2739
2740 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2741
2742 t = pa_tagstruct_command(
2743 s->context,
2744 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2745 &tag);
2746 pa_tagstruct_putu32(t, s->channel);
2747 pa_tagstruct_putu32(t, (uint32_t) mode);
2748 pa_tagstruct_put_proplist(t, p);
2749
2750 pa_pstream_send_tagstruct(s->context->pstream, t);
2751 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2752
2753 /* Please note that we don't update s->proplist here, because we
2754 * don't export that field */
2755
2756 return o;
2757 }
2758
2759 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2760 pa_operation *o;
2761 pa_tagstruct *t;
2762 uint32_t tag;
2763 const char * const*k;
2764
2765 pa_assert(s);
2766 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2767
2768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2770 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2771 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2772 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2773
2774 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2775
2776 t = pa_tagstruct_command(
2777 s->context,
2778 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2779 &tag);
2780 pa_tagstruct_putu32(t, s->channel);
2781
2782 for (k = keys; *k; k++)
2783 pa_tagstruct_puts(t, *k);
2784
2785 pa_tagstruct_puts(t, NULL);
2786
2787 pa_pstream_send_tagstruct(s->context->pstream, t);
2788 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2789
2790 /* Please note that we don't update s->proplist here, because we
2791 * don't export that field */
2792
2793 return o;
2794 }
2795
2796 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2797 pa_assert(s);
2798 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2799
2800 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2801 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2802 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2803 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2804
2805 s->direct_on_input = sink_input_idx;
2806
2807 return 0;
2808 }
2809
2810 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2811 pa_assert(s);
2812 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2813
2814 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2815 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2816 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2817
2818 return s->direct_on_input;
2819 }