]> code.delx.au - pulseaudio/blob - src/pulse/stream.c
client: detect forking in sample cache API, too
[pulseaudio] / src / pulse / stream.c
1 /***
2 This file is part of PulseAudio.
3
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <pulse/def.h>
32 #include <pulse/stream.h>
33 #include <pulse/timeval.h>
34 #include <pulse/rtclock.h>
35 #include <pulse/xmalloc.h>
36
37 #include <pulsecore/pstream-util.h>
38 #include <pulsecore/log.h>
39 #include <pulsecore/hashmap.h>
40 #include <pulsecore/macro.h>
41 #include <pulsecore/core-rtclock.h>
42
43 #include "fork-detect.h"
44 #include "internal.h"
45
46 #define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
47 #define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
48
49 #define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
50 #define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
51 #define SMOOTHER_MIN_HISTORY (4)
52
53 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
54 return pa_stream_new_with_proplist(c, name, ss, map, NULL);
55 }
56
57 static void reset_callbacks(pa_stream *s) {
58 s->read_callback = NULL;
59 s->read_userdata = NULL;
60 s->write_callback = NULL;
61 s->write_userdata = NULL;
62 s->state_callback = NULL;
63 s->state_userdata = NULL;
64 s->overflow_callback = NULL;
65 s->overflow_userdata = NULL;
66 s->underflow_callback = NULL;
67 s->underflow_userdata = NULL;
68 s->latency_update_callback = NULL;
69 s->latency_update_userdata = NULL;
70 s->moved_callback = NULL;
71 s->moved_userdata = NULL;
72 s->suspended_callback = NULL;
73 s->suspended_userdata = NULL;
74 s->started_callback = NULL;
75 s->started_userdata = NULL;
76 s->event_callback = NULL;
77 s->event_userdata = NULL;
78 s->buffer_attr_callback = NULL;
79 s->buffer_attr_userdata = NULL;
80 }
81
82 pa_stream *pa_stream_new_with_proplist(
83 pa_context *c,
84 const char *name,
85 const pa_sample_spec *ss,
86 const pa_channel_map *map,
87 pa_proplist *p) {
88
89 pa_stream *s;
90 int i;
91 pa_channel_map tmap;
92
93 pa_assert(c);
94 pa_assert(PA_REFCNT_VALUE(c) >= 1);
95
96 PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
97 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
98 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
99 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
100 PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
101 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
102 PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103
104 if (!map)
105 PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
106
107 s = pa_xnew(pa_stream, 1);
108 PA_REFCNT_INIT(s);
109 s->context = c;
110 s->mainloop = c->mainloop;
111
112 s->direction = PA_STREAM_NODIRECTION;
113 s->state = PA_STREAM_UNCONNECTED;
114 s->flags = 0;
115
116 s->sample_spec = *ss;
117 s->channel_map = *map;
118
119 s->direct_on_input = PA_INVALID_INDEX;
120
121 s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
122 if (name)
123 pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
124
125 s->channel = 0;
126 s->channel_valid = FALSE;
127 s->syncid = c->csyncid++;
128 s->stream_index = PA_INVALID_INDEX;
129
130 s->requested_bytes = 0;
131 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
132
133 /* We initialize der target length here, so that if the user
134 * passes no explicit buffering metrics the default is similar to
135 * what older PA versions provided. */
136
137 s->buffer_attr.maxlength = (uint32_t) -1;
138 s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
139 s->buffer_attr.minreq = (uint32_t) -1;
140 s->buffer_attr.prebuf = (uint32_t) -1;
141 s->buffer_attr.fragsize = (uint32_t) -1;
142
143 s->device_index = PA_INVALID_INDEX;
144 s->device_name = NULL;
145 s->suspended = FALSE;
146 s->corked = FALSE;
147
148 s->write_memblock = NULL;
149 s->write_data = NULL;
150
151 pa_memchunk_reset(&s->peek_memchunk);
152 s->peek_data = NULL;
153 s->record_memblockq = NULL;
154
155 memset(&s->timing_info, 0, sizeof(s->timing_info));
156 s->timing_info_valid = FALSE;
157
158 s->previous_time = 0;
159
160 s->read_index_not_before = 0;
161 s->write_index_not_before = 0;
162 for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
163 s->write_index_corrections[i].valid = 0;
164 s->current_write_index_correction = 0;
165
166 s->auto_timing_update_event = NULL;
167 s->auto_timing_update_requested = FALSE;
168 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
169
170 reset_callbacks(s);
171
172 s->smoother = NULL;
173
174 /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
175 PA_LLIST_PREPEND(pa_stream, c->streams, s);
176 pa_stream_ref(s);
177
178 return s;
179 }
180
181 static void stream_unlink(pa_stream *s) {
182 pa_operation *o, *n;
183 pa_assert(s);
184
185 if (!s->context)
186 return;
187
188 /* Detach from context */
189
190 /* Unref all operatio object that point to us */
191 for (o = s->context->operations; o; o = n) {
192 n = o->next;
193
194 if (o->stream == s)
195 pa_operation_cancel(o);
196 }
197
198 /* Drop all outstanding replies for this stream */
199 if (s->context->pdispatch)
200 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
201
202 if (s->channel_valid) {
203 pa_hashmap_remove((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, PA_UINT32_TO_PTR(s->channel));
204 s->channel = 0;
205 s->channel_valid = FALSE;
206 }
207
208 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
209 pa_stream_unref(s);
210
211 s->context = NULL;
212
213 if (s->auto_timing_update_event) {
214 pa_assert(s->mainloop);
215 s->mainloop->time_free(s->auto_timing_update_event);
216 }
217
218 reset_callbacks(s);
219 }
220
221 static void stream_free(pa_stream *s) {
222 pa_assert(s);
223
224 stream_unlink(s);
225
226 if (s->write_memblock) {
227 pa_memblock_release(s->write_memblock);
228 pa_memblock_unref(s->write_data);
229 }
230
231 if (s->peek_memchunk.memblock) {
232 if (s->peek_data)
233 pa_memblock_release(s->peek_memchunk.memblock);
234 pa_memblock_unref(s->peek_memchunk.memblock);
235 }
236
237 if (s->record_memblockq)
238 pa_memblockq_free(s->record_memblockq);
239
240 if (s->proplist)
241 pa_proplist_free(s->proplist);
242
243 if (s->smoother)
244 pa_smoother_free(s->smoother);
245
246 pa_xfree(s->device_name);
247 pa_xfree(s);
248 }
249
250 void pa_stream_unref(pa_stream *s) {
251 pa_assert(s);
252 pa_assert(PA_REFCNT_VALUE(s) >= 1);
253
254 if (PA_REFCNT_DEC(s) <= 0)
255 stream_free(s);
256 }
257
258 pa_stream* pa_stream_ref(pa_stream *s) {
259 pa_assert(s);
260 pa_assert(PA_REFCNT_VALUE(s) >= 1);
261
262 PA_REFCNT_INC(s);
263 return s;
264 }
265
266 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
267 pa_assert(s);
268 pa_assert(PA_REFCNT_VALUE(s) >= 1);
269
270 return s->state;
271 }
272
273 pa_context* pa_stream_get_context(pa_stream *s) {
274 pa_assert(s);
275 pa_assert(PA_REFCNT_VALUE(s) >= 1);
276
277 return s->context;
278 }
279
280 uint32_t pa_stream_get_index(pa_stream *s) {
281 pa_assert(s);
282 pa_assert(PA_REFCNT_VALUE(s) >= 1);
283
284 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
285 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
286
287 return s->stream_index;
288 }
289
290 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
291 pa_assert(s);
292 pa_assert(PA_REFCNT_VALUE(s) >= 1);
293
294 if (s->state == st)
295 return;
296
297 pa_stream_ref(s);
298
299 s->state = st;
300
301 if (s->state_callback)
302 s->state_callback(s, s->state_userdata);
303
304 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
305 stream_unlink(s);
306
307 pa_stream_unref(s);
308 }
309
310 static void request_auto_timing_update(pa_stream *s, pa_bool_t force) {
311 pa_assert(s);
312 pa_assert(PA_REFCNT_VALUE(s) >= 1);
313
314 if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
315 return;
316
317 if (s->state == PA_STREAM_READY &&
318 (force || !s->auto_timing_update_requested)) {
319 pa_operation *o;
320
321 /* pa_log("Automatically requesting new timing data"); */
322
323 if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
324 pa_operation_unref(o);
325 s->auto_timing_update_requested = TRUE;
326 }
327 }
328
329 if (s->auto_timing_update_event) {
330 if (force)
331 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
332
333 pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
334
335 s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
336 }
337 }
338
339 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
340 pa_context *c = userdata;
341 pa_stream *s;
342 uint32_t channel;
343
344 pa_assert(pd);
345 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
346 pa_assert(t);
347 pa_assert(c);
348 pa_assert(PA_REFCNT_VALUE(c) >= 1);
349
350 pa_context_ref(c);
351
352 if (pa_tagstruct_getu32(t, &channel) < 0 ||
353 !pa_tagstruct_eof(t)) {
354 pa_context_fail(c, PA_ERR_PROTOCOL);
355 goto finish;
356 }
357
358 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
359 goto finish;
360
361 if (s->state != PA_STREAM_READY)
362 goto finish;
363
364 pa_context_set_error(c, PA_ERR_KILLED);
365 pa_stream_set_state(s, PA_STREAM_FAILED);
366
367 finish:
368 pa_context_unref(c);
369 }
370
371 static void check_smoother_status(pa_stream *s, pa_bool_t aposteriori, pa_bool_t force_start, pa_bool_t force_stop) {
372 pa_usec_t x;
373
374 pa_assert(s);
375 pa_assert(!force_start || !force_stop);
376
377 if (!s->smoother)
378 return;
379
380 x = pa_rtclock_now();
381
382 if (s->timing_info_valid) {
383 if (aposteriori)
384 x -= s->timing_info.transport_usec;
385 else
386 x += s->timing_info.transport_usec;
387 }
388
389 if (s->suspended || s->corked || force_stop)
390 pa_smoother_pause(s->smoother, x);
391 else if (force_start || s->buffer_attr.prebuf == 0) {
392
393 if (!s->timing_info_valid &&
394 !aposteriori &&
395 !force_start &&
396 !force_stop &&
397 s->context->version >= 13) {
398
399 /* If the server supports STARTED events we take them as
400 * indications when audio really starts/stops playing, if
401 * we don't have any timing info yet -- instead of trying
402 * to be smart and guessing the server time. Otherwise the
403 * unknown transport delay add too much noise to our time
404 * calculations. */
405
406 return;
407 }
408
409 pa_smoother_resume(s->smoother, x, TRUE);
410 }
411
412 /* Please note that we have no idea if playback actually started
413 * if prebuf is non-zero! */
414 }
415
416 void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
417 pa_context *c = userdata;
418 pa_stream *s;
419 uint32_t channel;
420 const char *dn;
421 pa_bool_t suspended;
422 uint32_t di;
423 pa_usec_t usec = 0;
424 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
425
426 pa_assert(pd);
427 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
428 pa_assert(t);
429 pa_assert(c);
430 pa_assert(PA_REFCNT_VALUE(c) >= 1);
431
432 pa_context_ref(c);
433
434 if (c->version < 12) {
435 pa_context_fail(c, PA_ERR_PROTOCOL);
436 goto finish;
437 }
438
439 if (pa_tagstruct_getu32(t, &channel) < 0 ||
440 pa_tagstruct_getu32(t, &di) < 0 ||
441 pa_tagstruct_gets(t, &dn) < 0 ||
442 pa_tagstruct_get_boolean(t, &suspended) < 0) {
443 pa_context_fail(c, PA_ERR_PROTOCOL);
444 goto finish;
445 }
446
447 if (c->version >= 13) {
448
449 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
450 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
451 pa_tagstruct_getu32(t, &fragsize) < 0 ||
452 pa_tagstruct_get_usec(t, &usec) < 0) {
453 pa_context_fail(c, PA_ERR_PROTOCOL);
454 goto finish;
455 }
456 } else {
457 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
458 pa_tagstruct_getu32(t, &tlength) < 0 ||
459 pa_tagstruct_getu32(t, &prebuf) < 0 ||
460 pa_tagstruct_getu32(t, &minreq) < 0 ||
461 pa_tagstruct_get_usec(t, &usec) < 0) {
462 pa_context_fail(c, PA_ERR_PROTOCOL);
463 goto finish;
464 }
465 }
466 }
467
468 if (!pa_tagstruct_eof(t)) {
469 pa_context_fail(c, PA_ERR_PROTOCOL);
470 goto finish;
471 }
472
473 if (!dn || di == PA_INVALID_INDEX) {
474 pa_context_fail(c, PA_ERR_PROTOCOL);
475 goto finish;
476 }
477
478 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
479 goto finish;
480
481 if (s->state != PA_STREAM_READY)
482 goto finish;
483
484 if (c->version >= 13) {
485 if (s->direction == PA_STREAM_RECORD)
486 s->timing_info.configured_source_usec = usec;
487 else
488 s->timing_info.configured_sink_usec = usec;
489
490 s->buffer_attr.maxlength = maxlength;
491 s->buffer_attr.fragsize = fragsize;
492 s->buffer_attr.tlength = tlength;
493 s->buffer_attr.prebuf = prebuf;
494 s->buffer_attr.minreq = minreq;
495 }
496
497 pa_xfree(s->device_name);
498 s->device_name = pa_xstrdup(dn);
499 s->device_index = di;
500
501 s->suspended = suspended;
502
503 check_smoother_status(s, TRUE, FALSE, FALSE);
504 request_auto_timing_update(s, TRUE);
505
506 if (s->moved_callback)
507 s->moved_callback(s, s->moved_userdata);
508
509 finish:
510 pa_context_unref(c);
511 }
512
513 void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
514 pa_context *c = userdata;
515 pa_stream *s;
516 uint32_t channel;
517 pa_usec_t usec = 0;
518 uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
519
520 pa_assert(pd);
521 pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
522 pa_assert(t);
523 pa_assert(c);
524 pa_assert(PA_REFCNT_VALUE(c) >= 1);
525
526 pa_context_ref(c);
527
528 if (c->version < 15) {
529 pa_context_fail(c, PA_ERR_PROTOCOL);
530 goto finish;
531 }
532
533 if (pa_tagstruct_getu32(t, &channel) < 0) {
534 pa_context_fail(c, PA_ERR_PROTOCOL);
535 goto finish;
536 }
537
538 if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
539 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
540 pa_tagstruct_getu32(t, &fragsize) < 0 ||
541 pa_tagstruct_get_usec(t, &usec) < 0) {
542 pa_context_fail(c, PA_ERR_PROTOCOL);
543 goto finish;
544 }
545 } else {
546 if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
547 pa_tagstruct_getu32(t, &tlength) < 0 ||
548 pa_tagstruct_getu32(t, &prebuf) < 0 ||
549 pa_tagstruct_getu32(t, &minreq) < 0 ||
550 pa_tagstruct_get_usec(t, &usec) < 0) {
551 pa_context_fail(c, PA_ERR_PROTOCOL);
552 goto finish;
553 }
554 }
555
556 if (!pa_tagstruct_eof(t)) {
557 pa_context_fail(c, PA_ERR_PROTOCOL);
558 goto finish;
559 }
560
561 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
562 goto finish;
563
564 if (s->state != PA_STREAM_READY)
565 goto finish;
566
567 if (s->direction == PA_STREAM_RECORD)
568 s->timing_info.configured_source_usec = usec;
569 else
570 s->timing_info.configured_sink_usec = usec;
571
572 s->buffer_attr.maxlength = maxlength;
573 s->buffer_attr.fragsize = fragsize;
574 s->buffer_attr.tlength = tlength;
575 s->buffer_attr.prebuf = prebuf;
576 s->buffer_attr.minreq = minreq;
577
578 request_auto_timing_update(s, TRUE);
579
580 if (s->buffer_attr_callback)
581 s->buffer_attr_callback(s, s->buffer_attr_userdata);
582
583 finish:
584 pa_context_unref(c);
585 }
586
587 void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
588 pa_context *c = userdata;
589 pa_stream *s;
590 uint32_t channel;
591 pa_bool_t suspended;
592
593 pa_assert(pd);
594 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
595 pa_assert(t);
596 pa_assert(c);
597 pa_assert(PA_REFCNT_VALUE(c) >= 1);
598
599 pa_context_ref(c);
600
601 if (c->version < 12) {
602 pa_context_fail(c, PA_ERR_PROTOCOL);
603 goto finish;
604 }
605
606 if (pa_tagstruct_getu32(t, &channel) < 0 ||
607 pa_tagstruct_get_boolean(t, &suspended) < 0 ||
608 !pa_tagstruct_eof(t)) {
609 pa_context_fail(c, PA_ERR_PROTOCOL);
610 goto finish;
611 }
612
613 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
614 goto finish;
615
616 if (s->state != PA_STREAM_READY)
617 goto finish;
618
619 s->suspended = suspended;
620
621 check_smoother_status(s, TRUE, FALSE, FALSE);
622 request_auto_timing_update(s, TRUE);
623
624 if (s->suspended_callback)
625 s->suspended_callback(s, s->suspended_userdata);
626
627 finish:
628 pa_context_unref(c);
629 }
630
631 void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
632 pa_context *c = userdata;
633 pa_stream *s;
634 uint32_t channel;
635
636 pa_assert(pd);
637 pa_assert(command == PA_COMMAND_STARTED);
638 pa_assert(t);
639 pa_assert(c);
640 pa_assert(PA_REFCNT_VALUE(c) >= 1);
641
642 pa_context_ref(c);
643
644 if (c->version < 13) {
645 pa_context_fail(c, PA_ERR_PROTOCOL);
646 goto finish;
647 }
648
649 if (pa_tagstruct_getu32(t, &channel) < 0 ||
650 !pa_tagstruct_eof(t)) {
651 pa_context_fail(c, PA_ERR_PROTOCOL);
652 goto finish;
653 }
654
655 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
656 goto finish;
657
658 if (s->state != PA_STREAM_READY)
659 goto finish;
660
661 check_smoother_status(s, TRUE, TRUE, FALSE);
662 request_auto_timing_update(s, TRUE);
663
664 if (s->started_callback)
665 s->started_callback(s, s->started_userdata);
666
667 finish:
668 pa_context_unref(c);
669 }
670
671 void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
672 pa_context *c = userdata;
673 pa_stream *s;
674 uint32_t channel;
675 pa_proplist *pl = NULL;
676 const char *event;
677
678 pa_assert(pd);
679 pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
680 pa_assert(t);
681 pa_assert(c);
682 pa_assert(PA_REFCNT_VALUE(c) >= 1);
683
684 pa_context_ref(c);
685
686 if (c->version < 15) {
687 pa_context_fail(c, PA_ERR_PROTOCOL);
688 goto finish;
689 }
690
691 pl = pa_proplist_new();
692
693 if (pa_tagstruct_getu32(t, &channel) < 0 ||
694 pa_tagstruct_gets(t, &event) < 0 ||
695 pa_tagstruct_get_proplist(t, pl) < 0 ||
696 !pa_tagstruct_eof(t) || !event) {
697 pa_context_fail(c, PA_ERR_PROTOCOL);
698 goto finish;
699 }
700
701 if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
702 goto finish;
703
704 if (s->state != PA_STREAM_READY)
705 goto finish;
706
707 if (s->event_callback)
708 s->event_callback(s, event, pl, s->event_userdata);
709
710 finish:
711 pa_context_unref(c);
712
713 if (pl)
714 pa_proplist_free(pl);
715 }
716
717 void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
718 pa_stream *s;
719 pa_context *c = userdata;
720 uint32_t bytes, channel;
721
722 pa_assert(pd);
723 pa_assert(command == PA_COMMAND_REQUEST);
724 pa_assert(t);
725 pa_assert(c);
726 pa_assert(PA_REFCNT_VALUE(c) >= 1);
727
728 pa_context_ref(c);
729
730 if (pa_tagstruct_getu32(t, &channel) < 0 ||
731 pa_tagstruct_getu32(t, &bytes) < 0 ||
732 !pa_tagstruct_eof(t)) {
733 pa_context_fail(c, PA_ERR_PROTOCOL);
734 goto finish;
735 }
736
737 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
738 goto finish;
739
740 if (s->state != PA_STREAM_READY)
741 goto finish;
742
743 s->requested_bytes += bytes;
744
745 /* pa_log("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes); */
746
747 if (s->requested_bytes > 0 && s->write_callback)
748 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
749
750 finish:
751 pa_context_unref(c);
752 }
753
754 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
755 pa_stream *s;
756 pa_context *c = userdata;
757 uint32_t channel;
758
759 pa_assert(pd);
760 pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
761 pa_assert(t);
762 pa_assert(c);
763 pa_assert(PA_REFCNT_VALUE(c) >= 1);
764
765 pa_context_ref(c);
766
767 if (pa_tagstruct_getu32(t, &channel) < 0 ||
768 !pa_tagstruct_eof(t)) {
769 pa_context_fail(c, PA_ERR_PROTOCOL);
770 goto finish;
771 }
772
773 if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
774 goto finish;
775
776 if (s->state != PA_STREAM_READY)
777 goto finish;
778
779 if (s->buffer_attr.prebuf > 0)
780 check_smoother_status(s, TRUE, FALSE, TRUE);
781
782 request_auto_timing_update(s, TRUE);
783
784 if (command == PA_COMMAND_OVERFLOW) {
785 if (s->overflow_callback)
786 s->overflow_callback(s, s->overflow_userdata);
787 } else if (command == PA_COMMAND_UNDERFLOW) {
788 if (s->underflow_callback)
789 s->underflow_callback(s, s->underflow_userdata);
790 }
791
792 finish:
793 pa_context_unref(c);
794 }
795
796 static void invalidate_indexes(pa_stream *s, pa_bool_t r, pa_bool_t w) {
797 pa_assert(s);
798 pa_assert(PA_REFCNT_VALUE(s) >= 1);
799
800 /* pa_log("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag); */
801
802 if (s->state != PA_STREAM_READY)
803 return;
804
805 if (w) {
806 s->write_index_not_before = s->context->ctag;
807
808 if (s->timing_info_valid)
809 s->timing_info.write_index_corrupt = TRUE;
810
811 /* pa_log("write_index invalidated"); */
812 }
813
814 if (r) {
815 s->read_index_not_before = s->context->ctag;
816
817 if (s->timing_info_valid)
818 s->timing_info.read_index_corrupt = TRUE;
819
820 /* pa_log("read_index invalidated"); */
821 }
822
823 request_auto_timing_update(s, TRUE);
824 }
825
826 static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
827 pa_stream *s = userdata;
828
829 pa_assert(s);
830 pa_assert(PA_REFCNT_VALUE(s) >= 1);
831
832 pa_stream_ref(s);
833 request_auto_timing_update(s, FALSE);
834 pa_stream_unref(s);
835 }
836
837 static void create_stream_complete(pa_stream *s) {
838 pa_assert(s);
839 pa_assert(PA_REFCNT_VALUE(s) >= 1);
840 pa_assert(s->state == PA_STREAM_CREATING);
841
842 pa_stream_set_state(s, PA_STREAM_READY);
843
844 if (s->requested_bytes > 0 && s->write_callback)
845 s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
846
847 if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
848 s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
849 pa_assert(!s->auto_timing_update_event);
850 s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
851
852 request_auto_timing_update(s, TRUE);
853 }
854
855 check_smoother_status(s, TRUE, FALSE, FALSE);
856 }
857
858 static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_sample_spec *ss) {
859 pa_assert(s);
860 pa_assert(attr);
861 pa_assert(ss);
862
863 if (s->context->version >= 13)
864 return;
865
866 /* Version older than 0.9.10 didn't do server side buffer_attr
867 * selection, hence we have to fake it on the client side. */
868
869 /* We choose fairly conservative values here, to not confuse
870 * old clients with extremely large playback buffers */
871
872 if (attr->maxlength == (uint32_t) -1)
873 attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
874
875 if (attr->tlength == (uint32_t) -1)
876 attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
877
878 if (attr->minreq == (uint32_t) -1)
879 attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
880
881 if (attr->prebuf == (uint32_t) -1)
882 attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
883
884 if (attr->fragsize == (uint32_t) -1)
885 attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
886 }
887
888 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
889 pa_stream *s = userdata;
890 uint32_t requested_bytes = 0;
891
892 pa_assert(pd);
893 pa_assert(s);
894 pa_assert(PA_REFCNT_VALUE(s) >= 1);
895 pa_assert(s->state == PA_STREAM_CREATING);
896
897 pa_stream_ref(s);
898
899 if (command != PA_COMMAND_REPLY) {
900 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
901 goto finish;
902
903 pa_stream_set_state(s, PA_STREAM_FAILED);
904 goto finish;
905 }
906
907 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
908 s->channel == PA_INVALID_INDEX ||
909 ((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
910 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
911 pa_context_fail(s->context, PA_ERR_PROTOCOL);
912 goto finish;
913 }
914
915 s->requested_bytes = (int64_t) requested_bytes;
916
917 if (s->context->version >= 9) {
918 if (s->direction == PA_STREAM_PLAYBACK) {
919 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
920 pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
921 pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
922 pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
923 pa_context_fail(s->context, PA_ERR_PROTOCOL);
924 goto finish;
925 }
926 } else if (s->direction == PA_STREAM_RECORD) {
927 if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
928 pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
929 pa_context_fail(s->context, PA_ERR_PROTOCOL);
930 goto finish;
931 }
932 }
933 }
934
935 if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
936 pa_sample_spec ss;
937 pa_channel_map cm;
938 const char *dn = NULL;
939 pa_bool_t suspended;
940
941 if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
942 pa_tagstruct_get_channel_map(t, &cm) < 0 ||
943 pa_tagstruct_getu32(t, &s->device_index) < 0 ||
944 pa_tagstruct_gets(t, &dn) < 0 ||
945 pa_tagstruct_get_boolean(t, &suspended) < 0) {
946 pa_context_fail(s->context, PA_ERR_PROTOCOL);
947 goto finish;
948 }
949
950 if (!dn || s->device_index == PA_INVALID_INDEX ||
951 ss.channels != cm.channels ||
952 !pa_channel_map_valid(&cm) ||
953 !pa_sample_spec_valid(&ss) ||
954 (!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
955 (!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
956 (!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))) {
957 pa_context_fail(s->context, PA_ERR_PROTOCOL);
958 goto finish;
959 }
960
961 pa_xfree(s->device_name);
962 s->device_name = pa_xstrdup(dn);
963 s->suspended = suspended;
964
965 s->channel_map = cm;
966 s->sample_spec = ss;
967 }
968
969 if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
970 pa_usec_t usec;
971
972 if (pa_tagstruct_get_usec(t, &usec) < 0) {
973 pa_context_fail(s->context, PA_ERR_PROTOCOL);
974 goto finish;
975 }
976
977 if (s->direction == PA_STREAM_RECORD)
978 s->timing_info.configured_source_usec = usec;
979 else
980 s->timing_info.configured_sink_usec = usec;
981 }
982
983 if (!pa_tagstruct_eof(t)) {
984 pa_context_fail(s->context, PA_ERR_PROTOCOL);
985 goto finish;
986 }
987
988 if (s->direction == PA_STREAM_RECORD) {
989 pa_assert(!s->record_memblockq);
990
991 s->record_memblockq = pa_memblockq_new(
992 0,
993 s->buffer_attr.maxlength,
994 0,
995 pa_frame_size(&s->sample_spec),
996 1,
997 0,
998 0,
999 NULL);
1000 }
1001
1002 s->channel_valid = TRUE;
1003 pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1004
1005 create_stream_complete(s);
1006
1007 finish:
1008 pa_stream_unref(s);
1009 }
1010
1011 static int create_stream(
1012 pa_stream_direction_t direction,
1013 pa_stream *s,
1014 const char *dev,
1015 const pa_buffer_attr *attr,
1016 pa_stream_flags_t flags,
1017 const pa_cvolume *volume,
1018 pa_stream *sync_stream) {
1019
1020 pa_tagstruct *t;
1021 uint32_t tag;
1022 pa_bool_t volume_set = FALSE;
1023
1024 pa_assert(s);
1025 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1026 pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1027
1028 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1029 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1030 PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1031 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1032 PA_STREAM_INTERPOLATE_TIMING|
1033 PA_STREAM_NOT_MONOTONIC|
1034 PA_STREAM_AUTO_TIMING_UPDATE|
1035 PA_STREAM_NO_REMAP_CHANNELS|
1036 PA_STREAM_NO_REMIX_CHANNELS|
1037 PA_STREAM_FIX_FORMAT|
1038 PA_STREAM_FIX_RATE|
1039 PA_STREAM_FIX_CHANNELS|
1040 PA_STREAM_DONT_MOVE|
1041 PA_STREAM_VARIABLE_RATE|
1042 PA_STREAM_PEAK_DETECT|
1043 PA_STREAM_START_MUTED|
1044 PA_STREAM_ADJUST_LATENCY|
1045 PA_STREAM_EARLY_REQUESTS|
1046 PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1047 PA_STREAM_START_UNMUTED|
1048 PA_STREAM_FAIL_ON_SUSPEND|
1049 PA_STREAM_RELATIVE_VOLUME)), PA_ERR_INVALID);
1050
1051 PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1052 PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1053 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1054 /* Althought some of the other flags are not supported on older
1055 * version, we don't check for them here, because it doesn't hurt
1056 * when they are passed but actually not supported. This makes
1057 * client development easier */
1058
1059 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || !(flags & (PA_STREAM_START_MUTED)), PA_ERR_INVALID);
1060 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1061 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
1062 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1063 PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1064
1065 pa_stream_ref(s);
1066
1067 s->direction = direction;
1068 s->flags = flags;
1069 s->corked = !!(flags & PA_STREAM_START_CORKED);
1070
1071 if (sync_stream)
1072 s->syncid = sync_stream->syncid;
1073
1074 if (attr)
1075 s->buffer_attr = *attr;
1076 automatic_buffer_attr(s, &s->buffer_attr, &s->sample_spec);
1077
1078 if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1079 pa_usec_t x;
1080
1081 x = pa_rtclock_now();
1082
1083 pa_assert(!s->smoother);
1084 s->smoother = pa_smoother_new(
1085 SMOOTHER_ADJUST_TIME,
1086 SMOOTHER_HISTORY_TIME,
1087 !(flags & PA_STREAM_NOT_MONOTONIC),
1088 TRUE,
1089 SMOOTHER_MIN_HISTORY,
1090 x,
1091 TRUE);
1092 }
1093
1094 if (!dev)
1095 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1096
1097 t = pa_tagstruct_command(
1098 s->context,
1099 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1100 &tag);
1101
1102 if (s->context->version < 13)
1103 pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1104
1105 pa_tagstruct_put(
1106 t,
1107 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1108 PA_TAG_CHANNEL_MAP, &s->channel_map,
1109 PA_TAG_U32, PA_INVALID_INDEX,
1110 PA_TAG_STRING, dev,
1111 PA_TAG_U32, s->buffer_attr.maxlength,
1112 PA_TAG_BOOLEAN, s->corked,
1113 PA_TAG_INVALID);
1114
1115 if (s->direction == PA_STREAM_PLAYBACK) {
1116 pa_cvolume cv;
1117
1118 pa_tagstruct_put(
1119 t,
1120 PA_TAG_U32, s->buffer_attr.tlength,
1121 PA_TAG_U32, s->buffer_attr.prebuf,
1122 PA_TAG_U32, s->buffer_attr.minreq,
1123 PA_TAG_U32, s->syncid,
1124 PA_TAG_INVALID);
1125
1126 volume_set = !!volume;
1127
1128 if (!volume)
1129 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1130
1131 pa_tagstruct_put_cvolume(t, volume);
1132 } else
1133 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1134
1135 if (s->context->version >= 12) {
1136 pa_tagstruct_put(
1137 t,
1138 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1139 PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1140 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1141 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1142 PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1143 PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1144 PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1145 PA_TAG_INVALID);
1146 }
1147
1148 if (s->context->version >= 13) {
1149
1150 if (s->direction == PA_STREAM_PLAYBACK)
1151 pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1152 else
1153 pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1154
1155 pa_tagstruct_put(
1156 t,
1157 PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1158 PA_TAG_PROPLIST, s->proplist,
1159 PA_TAG_INVALID);
1160
1161 if (s->direction == PA_STREAM_RECORD)
1162 pa_tagstruct_putu32(t, s->direct_on_input);
1163 }
1164
1165 if (s->context->version >= 14) {
1166
1167 if (s->direction == PA_STREAM_PLAYBACK)
1168 pa_tagstruct_put_boolean(t, volume_set);
1169
1170 pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1171 }
1172
1173 if (s->context->version >= 15) {
1174
1175 if (s->direction == PA_STREAM_PLAYBACK)
1176 pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1177
1178 pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1179 pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1180 }
1181
1182 if (s->context->version >= 17) {
1183
1184 if (s->direction == PA_STREAM_PLAYBACK)
1185 pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1186
1187 }
1188
1189 pa_pstream_send_tagstruct(s->context->pstream, t);
1190 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1191
1192 pa_stream_set_state(s, PA_STREAM_CREATING);
1193
1194 pa_stream_unref(s);
1195 return 0;
1196 }
1197
1198 int pa_stream_connect_playback(
1199 pa_stream *s,
1200 const char *dev,
1201 const pa_buffer_attr *attr,
1202 pa_stream_flags_t flags,
1203 const pa_cvolume *volume,
1204 pa_stream *sync_stream) {
1205
1206 pa_assert(s);
1207 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1208
1209 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1210 }
1211
1212 int pa_stream_connect_record(
1213 pa_stream *s,
1214 const char *dev,
1215 const pa_buffer_attr *attr,
1216 pa_stream_flags_t flags) {
1217
1218 pa_assert(s);
1219 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1220
1221 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1222 }
1223
1224 int pa_stream_begin_write(
1225 pa_stream *s,
1226 void **data,
1227 size_t *nbytes) {
1228
1229 pa_assert(s);
1230 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1231
1232 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1233 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1234 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1235 PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1236 PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1237
1238 if (*nbytes != (size_t) -1) {
1239 size_t m, fs;
1240
1241 m = pa_mempool_block_size_max(s->context->mempool);
1242 fs = pa_frame_size(&s->sample_spec);
1243
1244 m = (m / fs) * fs;
1245 if (*nbytes > m)
1246 *nbytes = m;
1247 }
1248
1249 if (!s->write_memblock) {
1250 s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1251 s->write_data = pa_memblock_acquire(s->write_memblock);
1252 }
1253
1254 *data = s->write_data;
1255 *nbytes = pa_memblock_get_length(s->write_memblock);
1256
1257 return 0;
1258 }
1259
1260 int pa_stream_cancel_write(
1261 pa_stream *s) {
1262
1263 pa_assert(s);
1264 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1265
1266 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1267 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1268 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1269 PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1270
1271 pa_assert(s->write_data);
1272
1273 pa_memblock_release(s->write_memblock);
1274 pa_memblock_unref(s->write_memblock);
1275 s->write_memblock = NULL;
1276 s->write_data = NULL;
1277
1278 return 0;
1279 }
1280
1281 int pa_stream_write(
1282 pa_stream *s,
1283 const void *data,
1284 size_t length,
1285 pa_free_cb_t free_cb,
1286 int64_t offset,
1287 pa_seek_mode_t seek) {
1288
1289 pa_assert(s);
1290 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1291 pa_assert(data);
1292
1293 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1294 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1295 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1296 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1297 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1298 PA_CHECK_VALIDITY(s->context,
1299 !s->write_memblock ||
1300 ((data >= s->write_data) &&
1301 ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1302 PA_ERR_INVALID);
1303 PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1304
1305 if (s->write_memblock) {
1306 pa_memchunk chunk;
1307
1308 /* pa_stream_write_begin() was called before */
1309
1310 pa_memblock_release(s->write_memblock);
1311
1312 chunk.memblock = s->write_memblock;
1313 chunk.index = (const char *) data - (const char *) s->write_data;
1314 chunk.length = length;
1315
1316 s->write_memblock = NULL;
1317 s->write_data = NULL;
1318
1319 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1320 pa_memblock_unref(chunk.memblock);
1321
1322 } else {
1323 pa_seek_mode_t t_seek = seek;
1324 int64_t t_offset = offset;
1325 size_t t_length = length;
1326 const void *t_data = data;
1327
1328 /* pa_stream_write_begin() was not called before */
1329
1330 while (t_length > 0) {
1331 pa_memchunk chunk;
1332
1333 chunk.index = 0;
1334
1335 if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1336 chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1);
1337 chunk.length = t_length;
1338 } else {
1339 void *d;
1340
1341 chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1342 chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1343
1344 d = pa_memblock_acquire(chunk.memblock);
1345 memcpy(d, t_data, chunk.length);
1346 pa_memblock_release(chunk.memblock);
1347 }
1348
1349 pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1350
1351 t_offset = 0;
1352 t_seek = PA_SEEK_RELATIVE;
1353
1354 t_data = (const uint8_t*) t_data + chunk.length;
1355 t_length -= chunk.length;
1356
1357 pa_memblock_unref(chunk.memblock);
1358 }
1359
1360 if (free_cb && pa_pstream_get_shm(s->context->pstream))
1361 free_cb((void*) data);
1362 }
1363
1364 /* This is obviously wrong since we ignore the seeking index . But
1365 * that's OK, the server side applies the same error */
1366 s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1367
1368 /* pa_log("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes); */
1369
1370 if (s->direction == PA_STREAM_PLAYBACK) {
1371
1372 /* Update latency request correction */
1373 if (s->write_index_corrections[s->current_write_index_correction].valid) {
1374
1375 if (seek == PA_SEEK_ABSOLUTE) {
1376 s->write_index_corrections[s->current_write_index_correction].corrupt = FALSE;
1377 s->write_index_corrections[s->current_write_index_correction].absolute = TRUE;
1378 s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1379 } else if (seek == PA_SEEK_RELATIVE) {
1380 if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1381 s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1382 } else
1383 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
1384 }
1385
1386 /* Update the write index in the already available latency data */
1387 if (s->timing_info_valid) {
1388
1389 if (seek == PA_SEEK_ABSOLUTE) {
1390 s->timing_info.write_index_corrupt = FALSE;
1391 s->timing_info.write_index = offset + (int64_t) length;
1392 } else if (seek == PA_SEEK_RELATIVE) {
1393 if (!s->timing_info.write_index_corrupt)
1394 s->timing_info.write_index += offset + (int64_t) length;
1395 } else
1396 s->timing_info.write_index_corrupt = TRUE;
1397 }
1398
1399 if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1400 request_auto_timing_update(s, TRUE);
1401 }
1402
1403 return 0;
1404 }
1405
1406 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1407 pa_assert(s);
1408 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1409 pa_assert(data);
1410 pa_assert(length);
1411
1412 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1413 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1414 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1415
1416 if (!s->peek_memchunk.memblock) {
1417
1418 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1419 *data = NULL;
1420 *length = 0;
1421 return 0;
1422 }
1423
1424 s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1425 }
1426
1427 pa_assert(s->peek_data);
1428 *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1429 *length = s->peek_memchunk.length;
1430 return 0;
1431 }
1432
1433 int pa_stream_drop(pa_stream *s) {
1434 pa_assert(s);
1435 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1436
1437 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1438 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1439 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1440 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
1441
1442 pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1443
1444 /* Fix the simulated local read index */
1445 if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1446 s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1447
1448 pa_assert(s->peek_data);
1449 pa_memblock_release(s->peek_memchunk.memblock);
1450 pa_memblock_unref(s->peek_memchunk.memblock);
1451 pa_memchunk_reset(&s->peek_memchunk);
1452
1453 return 0;
1454 }
1455
1456 size_t pa_stream_writable_size(pa_stream *s) {
1457 pa_assert(s);
1458 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1459
1460 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1461 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1462 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1463
1464 return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1465 }
1466
1467 size_t pa_stream_readable_size(pa_stream *s) {
1468 pa_assert(s);
1469 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1470
1471 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1472 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1473 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1474
1475 return pa_memblockq_get_length(s->record_memblockq);
1476 }
1477
1478 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1479 pa_operation *o;
1480 pa_tagstruct *t;
1481 uint32_t tag;
1482
1483 pa_assert(s);
1484 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1485
1486 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1487 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1488 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1489
1490 /* Ask for a timing update before we cork/uncork to get the best
1491 * accuracy for the transport latency suitable for the
1492 * check_smoother_status() call in the started callback */
1493 request_auto_timing_update(s, TRUE);
1494
1495 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1496
1497 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1498 pa_tagstruct_putu32(t, s->channel);
1499 pa_pstream_send_tagstruct(s->context->pstream, t);
1500 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1501
1502 /* This might cause the read index to conitnue again, hence
1503 * let's request a timing update */
1504 request_auto_timing_update(s, TRUE);
1505
1506 return o;
1507 }
1508
1509 static pa_usec_t calc_time(pa_stream *s, pa_bool_t ignore_transport) {
1510 pa_usec_t usec;
1511
1512 pa_assert(s);
1513 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1514 pa_assert(s->state == PA_STREAM_READY);
1515 pa_assert(s->direction != PA_STREAM_UPLOAD);
1516 pa_assert(s->timing_info_valid);
1517 pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1518 pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1519
1520 if (s->direction == PA_STREAM_PLAYBACK) {
1521 /* The last byte that was written into the output device
1522 * had this time value associated */
1523 usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1524
1525 if (!s->corked && !s->suspended) {
1526
1527 if (!ignore_transport)
1528 /* Because the latency info took a little time to come
1529 * to us, we assume that the real output time is actually
1530 * a little ahead */
1531 usec += s->timing_info.transport_usec;
1532
1533 /* However, the output device usually maintains a buffer
1534 too, hence the real sample currently played is a little
1535 back */
1536 if (s->timing_info.sink_usec >= usec)
1537 usec = 0;
1538 else
1539 usec -= s->timing_info.sink_usec;
1540 }
1541
1542 } else {
1543 pa_assert(s->direction == PA_STREAM_RECORD);
1544
1545 /* The last byte written into the server side queue had
1546 * this time value associated */
1547 usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1548
1549 if (!s->corked && !s->suspended) {
1550
1551 if (!ignore_transport)
1552 /* Add transport latency */
1553 usec += s->timing_info.transport_usec;
1554
1555 /* Add latency of data in device buffer */
1556 usec += s->timing_info.source_usec;
1557
1558 /* If this is a monitor source, we need to correct the
1559 * time by the playback device buffer */
1560 if (s->timing_info.sink_usec >= usec)
1561 usec = 0;
1562 else
1563 usec -= s->timing_info.sink_usec;
1564 }
1565 }
1566
1567 return usec;
1568 }
1569
1570 static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1571 pa_operation *o = userdata;
1572 struct timeval local, remote, now;
1573 pa_timing_info *i;
1574 pa_bool_t playing = FALSE;
1575 uint64_t underrun_for = 0, playing_for = 0;
1576
1577 pa_assert(pd);
1578 pa_assert(o);
1579 pa_assert(PA_REFCNT_VALUE(o) >= 1);
1580
1581 if (!o->context || !o->stream)
1582 goto finish;
1583
1584 i = &o->stream->timing_info;
1585
1586 o->stream->timing_info_valid = FALSE;
1587 i->write_index_corrupt = TRUE;
1588 i->read_index_corrupt = TRUE;
1589
1590 if (command != PA_COMMAND_REPLY) {
1591 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
1592 goto finish;
1593
1594 } else {
1595
1596 if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1597 pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1598 pa_tagstruct_get_boolean(t, &playing) < 0 ||
1599 pa_tagstruct_get_timeval(t, &local) < 0 ||
1600 pa_tagstruct_get_timeval(t, &remote) < 0 ||
1601 pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1602 pa_tagstruct_gets64(t, &i->read_index) < 0) {
1603
1604 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1605 goto finish;
1606 }
1607
1608 if (o->context->version >= 13 &&
1609 o->stream->direction == PA_STREAM_PLAYBACK)
1610 if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1611 pa_tagstruct_getu64(t, &playing_for) < 0) {
1612
1613 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1614 goto finish;
1615 }
1616
1617
1618 if (!pa_tagstruct_eof(t)) {
1619 pa_context_fail(o->context, PA_ERR_PROTOCOL);
1620 goto finish;
1621 }
1622 o->stream->timing_info_valid = TRUE;
1623 i->write_index_corrupt = FALSE;
1624 i->read_index_corrupt = FALSE;
1625
1626 i->playing = (int) playing;
1627 i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1628
1629 pa_gettimeofday(&now);
1630
1631 /* Calculcate timestamps */
1632 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1633 /* local and remote seem to have synchronized clocks */
1634
1635 if (o->stream->direction == PA_STREAM_PLAYBACK)
1636 i->transport_usec = pa_timeval_diff(&remote, &local);
1637 else
1638 i->transport_usec = pa_timeval_diff(&now, &remote);
1639
1640 i->synchronized_clocks = TRUE;
1641 i->timestamp = remote;
1642 } else {
1643 /* clocks are not synchronized, let's estimate latency then */
1644 i->transport_usec = pa_timeval_diff(&now, &local)/2;
1645 i->synchronized_clocks = FALSE;
1646 i->timestamp = local;
1647 pa_timeval_add(&i->timestamp, i->transport_usec);
1648 }
1649
1650 /* Invalidate read and write indexes if necessary */
1651 if (tag < o->stream->read_index_not_before)
1652 i->read_index_corrupt = TRUE;
1653
1654 if (tag < o->stream->write_index_not_before)
1655 i->write_index_corrupt = TRUE;
1656
1657 if (o->stream->direction == PA_STREAM_PLAYBACK) {
1658 /* Write index correction */
1659
1660 int n, j;
1661 uint32_t ctag = tag;
1662
1663 /* Go through the saved correction values and add up the
1664 * total correction.*/
1665 for (n = 0, j = o->stream->current_write_index_correction+1;
1666 n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1667 n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1668
1669 /* Step over invalid data or out-of-date data */
1670 if (!o->stream->write_index_corrections[j].valid ||
1671 o->stream->write_index_corrections[j].tag < ctag)
1672 continue;
1673
1674 /* Make sure that everything is in order */
1675 ctag = o->stream->write_index_corrections[j].tag+1;
1676
1677 /* Now fix the write index */
1678 if (o->stream->write_index_corrections[j].corrupt) {
1679 /* A corrupting seek was made */
1680 i->write_index_corrupt = TRUE;
1681 } else if (o->stream->write_index_corrections[j].absolute) {
1682 /* An absolute seek was made */
1683 i->write_index = o->stream->write_index_corrections[j].value;
1684 i->write_index_corrupt = FALSE;
1685 } else if (!i->write_index_corrupt) {
1686 /* A relative seek was made */
1687 i->write_index += o->stream->write_index_corrections[j].value;
1688 }
1689 }
1690
1691 /* Clear old correction entries */
1692 for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1693 if (!o->stream->write_index_corrections[n].valid)
1694 continue;
1695
1696 if (o->stream->write_index_corrections[n].tag <= tag)
1697 o->stream->write_index_corrections[n].valid = FALSE;
1698 }
1699 }
1700
1701 if (o->stream->direction == PA_STREAM_RECORD) {
1702 /* Read index correction */
1703
1704 if (!i->read_index_corrupt)
1705 i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1706 }
1707
1708 /* Update smoother */
1709 if (o->stream->smoother) {
1710 pa_usec_t u, x;
1711
1712 u = x = pa_rtclock_now() - i->transport_usec;
1713
1714 if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1715 pa_usec_t su;
1716
1717 /* If we weren't playing then it will take some time
1718 * until the audio will actually come out through the
1719 * speakers. Since we follow that timing here, we need
1720 * to try to fix this up */
1721
1722 su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1723
1724 if (su < i->sink_usec)
1725 x += i->sink_usec - su;
1726 }
1727
1728 if (!i->playing)
1729 pa_smoother_pause(o->stream->smoother, x);
1730
1731 /* Update the smoother */
1732 if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1733 (o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1734 pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, TRUE));
1735
1736 if (i->playing)
1737 pa_smoother_resume(o->stream->smoother, x, TRUE);
1738 }
1739 }
1740
1741 o->stream->auto_timing_update_requested = FALSE;
1742
1743 if (o->stream->latency_update_callback)
1744 o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1745
1746 if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1747 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1748 cb(o->stream, o->stream->timing_info_valid, o->userdata);
1749 }
1750
1751 finish:
1752
1753 pa_operation_done(o);
1754 pa_operation_unref(o);
1755 }
1756
1757 pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1758 uint32_t tag;
1759 pa_operation *o;
1760 pa_tagstruct *t;
1761 struct timeval now;
1762 int cidx = 0;
1763
1764 pa_assert(s);
1765 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1766
1767 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1768 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1769 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1770
1771 if (s->direction == PA_STREAM_PLAYBACK) {
1772 /* Find a place to store the write_index correction data for this entry */
1773 cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1774
1775 /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1776 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1777 }
1778 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1779
1780 t = pa_tagstruct_command(
1781 s->context,
1782 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1783 &tag);
1784 pa_tagstruct_putu32(t, s->channel);
1785 pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1786
1787 pa_pstream_send_tagstruct(s->context->pstream, t);
1788 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1789
1790 if (s->direction == PA_STREAM_PLAYBACK) {
1791 /* Fill in initial correction data */
1792
1793 s->current_write_index_correction = cidx;
1794
1795 s->write_index_corrections[cidx].valid = TRUE;
1796 s->write_index_corrections[cidx].absolute = FALSE;
1797 s->write_index_corrections[cidx].corrupt = FALSE;
1798 s->write_index_corrections[cidx].tag = tag;
1799 s->write_index_corrections[cidx].value = 0;
1800 }
1801
1802 return o;
1803 }
1804
1805 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1806 pa_stream *s = userdata;
1807
1808 pa_assert(pd);
1809 pa_assert(s);
1810 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1811
1812 pa_stream_ref(s);
1813
1814 if (command != PA_COMMAND_REPLY) {
1815 if (pa_context_handle_error(s->context, command, t, FALSE) < 0)
1816 goto finish;
1817
1818 pa_stream_set_state(s, PA_STREAM_FAILED);
1819 goto finish;
1820 } else if (!pa_tagstruct_eof(t)) {
1821 pa_context_fail(s->context, PA_ERR_PROTOCOL);
1822 goto finish;
1823 }
1824
1825 pa_stream_set_state(s, PA_STREAM_TERMINATED);
1826
1827 finish:
1828 pa_stream_unref(s);
1829 }
1830
1831 int pa_stream_disconnect(pa_stream *s) {
1832 pa_tagstruct *t;
1833 uint32_t tag;
1834
1835 pa_assert(s);
1836 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1837
1838 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1839 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
1840 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1841
1842 pa_stream_ref(s);
1843
1844 t = pa_tagstruct_command(
1845 s->context,
1846 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
1847 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
1848 &tag);
1849 pa_tagstruct_putu32(t, s->channel);
1850 pa_pstream_send_tagstruct(s->context->pstream, t);
1851 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
1852
1853 pa_stream_unref(s);
1854 return 0;
1855 }
1856
1857 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1858 pa_assert(s);
1859 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1860
1861 if (pa_detect_fork())
1862 return;
1863
1864 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1865 return;
1866
1867 s->read_callback = cb;
1868 s->read_userdata = userdata;
1869 }
1870
1871 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
1872 pa_assert(s);
1873 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1874
1875 if (pa_detect_fork())
1876 return;
1877
1878 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1879 return;
1880
1881 s->write_callback = cb;
1882 s->write_userdata = userdata;
1883 }
1884
1885 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1886 pa_assert(s);
1887 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1888
1889 if (pa_detect_fork())
1890 return;
1891
1892 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1893 return;
1894
1895 s->state_callback = cb;
1896 s->state_userdata = userdata;
1897 }
1898
1899 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1900 pa_assert(s);
1901 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1902
1903 if (pa_detect_fork())
1904 return;
1905
1906 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1907 return;
1908
1909 s->overflow_callback = cb;
1910 s->overflow_userdata = userdata;
1911 }
1912
1913 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1914 pa_assert(s);
1915 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1916
1917 if (pa_detect_fork())
1918 return;
1919
1920 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1921 return;
1922
1923 s->underflow_callback = cb;
1924 s->underflow_userdata = userdata;
1925 }
1926
1927 void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1928 pa_assert(s);
1929 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1930
1931 if (pa_detect_fork())
1932 return;
1933
1934 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1935 return;
1936
1937 s->latency_update_callback = cb;
1938 s->latency_update_userdata = userdata;
1939 }
1940
1941 void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1942 pa_assert(s);
1943 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1944
1945 if (pa_detect_fork())
1946 return;
1947
1948 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1949 return;
1950
1951 s->moved_callback = cb;
1952 s->moved_userdata = userdata;
1953 }
1954
1955 void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1956 pa_assert(s);
1957 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1958
1959 if (pa_detect_fork())
1960 return;
1961
1962 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1963 return;
1964
1965 s->suspended_callback = cb;
1966 s->suspended_userdata = userdata;
1967 }
1968
1969 void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1970 pa_assert(s);
1971 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1972
1973 if (pa_detect_fork())
1974 return;
1975
1976 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1977 return;
1978
1979 s->started_callback = cb;
1980 s->started_userdata = userdata;
1981 }
1982
1983 void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
1984 pa_assert(s);
1985 pa_assert(PA_REFCNT_VALUE(s) >= 1);
1986
1987 if (pa_detect_fork())
1988 return;
1989
1990 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
1991 return;
1992
1993 s->event_callback = cb;
1994 s->event_userdata = userdata;
1995 }
1996
1997 void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
1998 pa_assert(s);
1999 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2000
2001 if (pa_detect_fork())
2002 return;
2003
2004 if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2005 return;
2006
2007 s->buffer_attr_callback = cb;
2008 s->buffer_attr_userdata = userdata;
2009 }
2010
2011 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2012 pa_operation *o = userdata;
2013 int success = 1;
2014
2015 pa_assert(pd);
2016 pa_assert(o);
2017 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2018
2019 if (!o->context)
2020 goto finish;
2021
2022 if (command != PA_COMMAND_REPLY) {
2023 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2024 goto finish;
2025
2026 success = 0;
2027 } else if (!pa_tagstruct_eof(t)) {
2028 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2029 goto finish;
2030 }
2031
2032 if (o->callback) {
2033 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2034 cb(o->stream, success, o->userdata);
2035 }
2036
2037 finish:
2038 pa_operation_done(o);
2039 pa_operation_unref(o);
2040 }
2041
2042 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2043 pa_operation *o;
2044 pa_tagstruct *t;
2045 uint32_t tag;
2046
2047 pa_assert(s);
2048 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049
2050 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2051 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2052 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2053
2054 /* Ask for a timing update before we cork/uncork to get the best
2055 * accuracy for the transport latency suitable for the
2056 * check_smoother_status() call in the started callback */
2057 request_auto_timing_update(s, TRUE);
2058
2059 s->corked = b;
2060
2061 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2062
2063 t = pa_tagstruct_command(
2064 s->context,
2065 (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2066 &tag);
2067 pa_tagstruct_putu32(t, s->channel);
2068 pa_tagstruct_put_boolean(t, !!b);
2069 pa_pstream_send_tagstruct(s->context->pstream, t);
2070 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2071
2072 check_smoother_status(s, FALSE, FALSE, FALSE);
2073
2074 /* This might cause the indexes to hang/start again, hence let's
2075 * request a timing update, after the cork/uncork, too */
2076 request_auto_timing_update(s, TRUE);
2077
2078 return o;
2079 }
2080
2081 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2082 pa_tagstruct *t;
2083 pa_operation *o;
2084 uint32_t tag;
2085
2086 pa_assert(s);
2087 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2088
2089 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2090 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2091
2092 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2093
2094 t = pa_tagstruct_command(s->context, command, &tag);
2095 pa_tagstruct_putu32(t, s->channel);
2096 pa_pstream_send_tagstruct(s->context->pstream, t);
2097 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2098
2099 return o;
2100 }
2101
2102 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2103 pa_operation *o;
2104
2105 pa_assert(s);
2106 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2107
2108 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2109 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2110 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2111
2112 /* Ask for a timing update *before* the flush, so that the
2113 * transport usec is as up to date as possible when we get the
2114 * underflow message and update the smoother status*/
2115 request_auto_timing_update(s, TRUE);
2116
2117 if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2118 return NULL;
2119
2120 if (s->direction == PA_STREAM_PLAYBACK) {
2121
2122 if (s->write_index_corrections[s->current_write_index_correction].valid)
2123 s->write_index_corrections[s->current_write_index_correction].corrupt = TRUE;
2124
2125 if (s->buffer_attr.prebuf > 0)
2126 check_smoother_status(s, FALSE, FALSE, TRUE);
2127
2128 /* This will change the write index, but leave the
2129 * read index untouched. */
2130 invalidate_indexes(s, FALSE, TRUE);
2131
2132 } else
2133 /* For record streams this has no influence on the write
2134 * index, but the read index might jump. */
2135 invalidate_indexes(s, TRUE, FALSE);
2136
2137 return o;
2138 }
2139
2140 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2141 pa_operation *o;
2142
2143 pa_assert(s);
2144 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2145
2146 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2147 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2148 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2149 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2150
2151 /* Ask for a timing update before we cork/uncork to get the best
2152 * accuracy for the transport latency suitable for the
2153 * check_smoother_status() call in the started callback */
2154 request_auto_timing_update(s, TRUE);
2155
2156 if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2157 return NULL;
2158
2159 /* This might cause the read index to hang again, hence
2160 * let's request a timing update */
2161 request_auto_timing_update(s, TRUE);
2162
2163 return o;
2164 }
2165
2166 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2167 pa_operation *o;
2168
2169 pa_assert(s);
2170 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2171
2172 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2173 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2174 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2175 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2176
2177 /* Ask for a timing update before we cork/uncork to get the best
2178 * accuracy for the transport latency suitable for the
2179 * check_smoother_status() call in the started callback */
2180 request_auto_timing_update(s, TRUE);
2181
2182 if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2183 return NULL;
2184
2185 /* This might cause the read index to start moving again, hence
2186 * let's request a timing update */
2187 request_auto_timing_update(s, TRUE);
2188
2189 return o;
2190 }
2191
2192 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2193 pa_operation *o;
2194
2195 pa_assert(s);
2196 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2197 pa_assert(name);
2198
2199 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2200 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2201 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2202
2203 if (s->context->version >= 13) {
2204 pa_proplist *p = pa_proplist_new();
2205
2206 pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2207 o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2208 pa_proplist_free(p);
2209 } else {
2210 pa_tagstruct *t;
2211 uint32_t tag;
2212
2213 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2214 t = pa_tagstruct_command(
2215 s->context,
2216 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2217 &tag);
2218 pa_tagstruct_putu32(t, s->channel);
2219 pa_tagstruct_puts(t, name);
2220 pa_pstream_send_tagstruct(s->context->pstream, t);
2221 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2222 }
2223
2224 return o;
2225 }
2226
2227 int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2228 pa_usec_t usec;
2229
2230 pa_assert(s);
2231 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2232
2233 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2234 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2235 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2236 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2237 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2238 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2239
2240 if (s->smoother)
2241 usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2242 else
2243 usec = calc_time(s, FALSE);
2244
2245 /* Make sure the time runs monotonically */
2246 if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2247 if (usec < s->previous_time)
2248 usec = s->previous_time;
2249 else
2250 s->previous_time = usec;
2251 }
2252
2253 if (r_usec)
2254 *r_usec = usec;
2255
2256 return 0;
2257 }
2258
2259 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2260 pa_assert(s);
2261 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2262
2263 if (negative)
2264 *negative = 0;
2265
2266 if (a >= b)
2267 return a-b;
2268 else {
2269 if (negative && s->direction == PA_STREAM_RECORD) {
2270 *negative = 1;
2271 return b-a;
2272 } else
2273 return 0;
2274 }
2275 }
2276
2277 int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2278 pa_usec_t t, c;
2279 int r;
2280 int64_t cindex;
2281
2282 pa_assert(s);
2283 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2284 pa_assert(r_usec);
2285
2286 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2287 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2288 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2289 PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2290 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2291 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2292
2293 if ((r = pa_stream_get_time(s, &t)) < 0)
2294 return r;
2295
2296 if (s->direction == PA_STREAM_PLAYBACK)
2297 cindex = s->timing_info.write_index;
2298 else
2299 cindex = s->timing_info.read_index;
2300
2301 if (cindex < 0)
2302 cindex = 0;
2303
2304 c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2305
2306 if (s->direction == PA_STREAM_PLAYBACK)
2307 *r_usec = time_counter_diff(s, c, t, negative);
2308 else
2309 *r_usec = time_counter_diff(s, t, c, negative);
2310
2311 return 0;
2312 }
2313
2314 const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2315 pa_assert(s);
2316 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2317
2318 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2319 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2320 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2321 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2322
2323 return &s->timing_info;
2324 }
2325
2326 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2327 pa_assert(s);
2328 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2329
2330 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2331
2332 return &s->sample_spec;
2333 }
2334
2335 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2336 pa_assert(s);
2337 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2338
2339 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2340
2341 return &s->channel_map;
2342 }
2343
2344 const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2345 pa_assert(s);
2346 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2347
2348 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2349 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2350 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2351
2352 return &s->buffer_attr;
2353 }
2354
2355 static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2356 pa_operation *o = userdata;
2357 int success = 1;
2358
2359 pa_assert(pd);
2360 pa_assert(o);
2361 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2362
2363 if (!o->context)
2364 goto finish;
2365
2366 if (command != PA_COMMAND_REPLY) {
2367 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2368 goto finish;
2369
2370 success = 0;
2371 } else {
2372 if (o->stream->direction == PA_STREAM_PLAYBACK) {
2373 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2374 pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2375 pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2376 pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2377 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2378 goto finish;
2379 }
2380 } else if (o->stream->direction == PA_STREAM_RECORD) {
2381 if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2382 pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2383 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2384 goto finish;
2385 }
2386 }
2387
2388 if (o->stream->context->version >= 13) {
2389 pa_usec_t usec;
2390
2391 if (pa_tagstruct_get_usec(t, &usec) < 0) {
2392 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2393 goto finish;
2394 }
2395
2396 if (o->stream->direction == PA_STREAM_RECORD)
2397 o->stream->timing_info.configured_source_usec = usec;
2398 else
2399 o->stream->timing_info.configured_sink_usec = usec;
2400 }
2401
2402 if (!pa_tagstruct_eof(t)) {
2403 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2404 goto finish;
2405 }
2406 }
2407
2408 if (o->callback) {
2409 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2410 cb(o->stream, success, o->userdata);
2411 }
2412
2413 finish:
2414 pa_operation_done(o);
2415 pa_operation_unref(o);
2416 }
2417
2418
2419 pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2420 pa_operation *o;
2421 pa_tagstruct *t;
2422 uint32_t tag;
2423
2424 pa_assert(s);
2425 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2426 pa_assert(attr);
2427
2428 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2429 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2430 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2431 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2432
2433 /* Ask for a timing update before we cork/uncork to get the best
2434 * accuracy for the transport latency suitable for the
2435 * check_smoother_status() call in the started callback */
2436 request_auto_timing_update(s, TRUE);
2437
2438 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2439
2440 t = pa_tagstruct_command(
2441 s->context,
2442 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2443 &tag);
2444 pa_tagstruct_putu32(t, s->channel);
2445
2446 pa_tagstruct_putu32(t, attr->maxlength);
2447
2448 if (s->direction == PA_STREAM_PLAYBACK)
2449 pa_tagstruct_put(
2450 t,
2451 PA_TAG_U32, attr->tlength,
2452 PA_TAG_U32, attr->prebuf,
2453 PA_TAG_U32, attr->minreq,
2454 PA_TAG_INVALID);
2455 else
2456 pa_tagstruct_putu32(t, attr->fragsize);
2457
2458 if (s->context->version >= 13)
2459 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2460
2461 if (s->context->version >= 14)
2462 pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2463
2464 pa_pstream_send_tagstruct(s->context->pstream, t);
2465 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2466
2467 /* This might cause changes in the read/write indexex, hence let's
2468 * request a timing update */
2469 request_auto_timing_update(s, TRUE);
2470
2471 return o;
2472 }
2473
2474 uint32_t pa_stream_get_device_index(pa_stream *s) {
2475 pa_assert(s);
2476 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2477
2478 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2479 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2480 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2481 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2482 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2483
2484 return s->device_index;
2485 }
2486
2487 const char *pa_stream_get_device_name(pa_stream *s) {
2488 pa_assert(s);
2489 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2490
2491 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2492 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2493 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2494 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2495 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2496
2497 return s->device_name;
2498 }
2499
2500 int pa_stream_is_suspended(pa_stream *s) {
2501 pa_assert(s);
2502 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2503
2504 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2505 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2506 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2507 PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2508
2509 return s->suspended;
2510 }
2511
2512 int pa_stream_is_corked(pa_stream *s) {
2513 pa_assert(s);
2514 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2515
2516 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2517 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2518 PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2519
2520 return s->corked;
2521 }
2522
2523 static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2524 pa_operation *o = userdata;
2525 int success = 1;
2526
2527 pa_assert(pd);
2528 pa_assert(o);
2529 pa_assert(PA_REFCNT_VALUE(o) >= 1);
2530
2531 if (!o->context)
2532 goto finish;
2533
2534 if (command != PA_COMMAND_REPLY) {
2535 if (pa_context_handle_error(o->context, command, t, FALSE) < 0)
2536 goto finish;
2537
2538 success = 0;
2539 } else {
2540
2541 if (!pa_tagstruct_eof(t)) {
2542 pa_context_fail(o->context, PA_ERR_PROTOCOL);
2543 goto finish;
2544 }
2545 }
2546
2547 o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2548 pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2549
2550 if (o->callback) {
2551 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2552 cb(o->stream, success, o->userdata);
2553 }
2554
2555 finish:
2556 pa_operation_done(o);
2557 pa_operation_unref(o);
2558 }
2559
2560
2561 pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2562 pa_operation *o;
2563 pa_tagstruct *t;
2564 uint32_t tag;
2565
2566 pa_assert(s);
2567 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2568
2569 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2570 PA_CHECK_VALIDITY_RETURN_NULL(s->context, rate > 0 && rate <= PA_RATE_MAX, PA_ERR_INVALID);
2571 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2572 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2573 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2574 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2575
2576 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2577 o->private = PA_UINT_TO_PTR(rate);
2578
2579 t = pa_tagstruct_command(
2580 s->context,
2581 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2582 &tag);
2583 pa_tagstruct_putu32(t, s->channel);
2584 pa_tagstruct_putu32(t, rate);
2585
2586 pa_pstream_send_tagstruct(s->context->pstream, t);
2587 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2588
2589 return o;
2590 }
2591
2592 pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2593 pa_operation *o;
2594 pa_tagstruct *t;
2595 uint32_t tag;
2596
2597 pa_assert(s);
2598 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2599
2600 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2601 PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2602 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2603 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2604 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2605
2606 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2607
2608 t = pa_tagstruct_command(
2609 s->context,
2610 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2611 &tag);
2612 pa_tagstruct_putu32(t, s->channel);
2613 pa_tagstruct_putu32(t, (uint32_t) mode);
2614 pa_tagstruct_put_proplist(t, p);
2615
2616 pa_pstream_send_tagstruct(s->context->pstream, t);
2617 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2618
2619 /* Please note that we don't update s->proplist here, because we
2620 * don't export that field */
2621
2622 return o;
2623 }
2624
2625 pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2626 pa_operation *o;
2627 pa_tagstruct *t;
2628 uint32_t tag;
2629 const char * const*k;
2630
2631 pa_assert(s);
2632 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2633
2634 PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2635 PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2636 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2637 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2638 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2639
2640 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2641
2642 t = pa_tagstruct_command(
2643 s->context,
2644 (uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2645 &tag);
2646 pa_tagstruct_putu32(t, s->channel);
2647
2648 for (k = keys; *k; k++)
2649 pa_tagstruct_puts(t, *k);
2650
2651 pa_tagstruct_puts(t, NULL);
2652
2653 pa_pstream_send_tagstruct(s->context->pstream, t);
2654 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2655
2656 /* Please note that we don't update s->proplist here, because we
2657 * don't export that field */
2658
2659 return o;
2660 }
2661
2662 int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2663 pa_assert(s);
2664 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2665
2666 PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2667 PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2668 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2669 PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2670
2671 s->direct_on_input = sink_input_idx;
2672
2673 return 0;
2674 }
2675
2676 uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2677 pa_assert(s);
2678 pa_assert(PA_REFCNT_VALUE(s) >= 1);
2679
2680 PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2681 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2682 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2683
2684 return s->direct_on_input;
2685 }