]> code.delx.au - pulseaudio/blob - src/polyp/stream.c
f4436ff5e73678b0a3f895c246df157aa8a6a687
[pulseaudio] / src / polyp / stream.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include <polyp/def.h>
32 #include <polypcore/xmalloc.h>
33 #include <polypcore/pstream-util.h>
34 #include <polypcore/util.h>
35 #include <polypcore/log.h>
36 #include <polypcore/hashmap.h>
37
38 #include "internal.h"
39
40 #define LATENCY_IPOL_INTERVAL_USEC (10000L)
41 #define COUNTER_HASHMAP_MAXSIZE (5)
42
43 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
44 pa_stream *s;
45
46 assert(c);
47
48 PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
49 PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
50
51 s = pa_xnew(pa_stream, 1);
52 s->ref = 1;
53 s->context = c;
54 s->mainloop = c->mainloop;
55
56 s->read_callback = NULL;
57 s->read_userdata = NULL;
58 s->write_callback = NULL;
59 s->write_userdata = NULL;
60 s->state_callback = NULL;
61 s->state_userdata = NULL;
62 s->overflow_callback = NULL;
63 s->overflow_userdata = NULL;
64 s->underflow_callback = NULL;
65 s->underflow_userdata = NULL;
66
67 s->direction = PA_STREAM_NODIRECTION;
68 s->name = pa_xstrdup(name);
69 s->sample_spec = *ss;
70
71 if (map)
72 s->channel_map = *map;
73 else
74 pa_channel_map_init_auto(&s->channel_map, ss->channels);
75
76 s->channel = 0;
77 s->channel_valid = 0;
78 s->syncid = c->csyncid++;
79 s->device_index = PA_INVALID_INDEX;
80 s->requested_bytes = 0;
81 s->state = PA_STREAM_UNCONNECTED;
82 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
83
84 s->peek_memchunk.index = 0;
85 s->peek_memchunk.length = 0;
86 s->peek_memchunk.memblock = NULL;
87
88 s->record_memblockq = NULL;
89
90 s->counter_hashmap = pa_hashmap_new(NULL, NULL);
91
92 s->counter = 0;
93 s->previous_time = 0;
94 s->previous_ipol_time = 0;
95
96 s->corked = 0;
97 s->interpolate = 0;
98
99 s->ipol_usec = 0;
100 memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp));
101 s->ipol_event = NULL;
102 s->ipol_requested = 0;
103
104 PA_LLIST_PREPEND(pa_stream, c->streams, s);
105
106 /* The context and stream will point at each other. We cannot ref count
107 both though since that will create a loop. */
108 pa_context_ref(s->context);
109
110 return s;
111 }
112
113 static void hashmap_free_func(void *p, void *userdata) {
114 pa_xfree(p);
115 }
116
117 static void stream_free(pa_stream *s) {
118 assert(s && s->context && !s->channel_valid);
119
120 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
121
122 pa_context_unref(s->context);
123
124 if (s->ipol_event) {
125 assert(s->mainloop);
126 s->mainloop->time_free(s->ipol_event);
127 }
128
129 if (s->peek_memchunk.memblock)
130 pa_memblock_unref(s->peek_memchunk.memblock);
131
132 if (s->record_memblockq)
133 pa_memblockq_free(s->record_memblockq);
134
135 if (s->counter_hashmap)
136 pa_hashmap_free(s->counter_hashmap, hashmap_free_func, NULL);
137
138 pa_xfree(s->name);
139 pa_xfree(s);
140 }
141
142 void pa_stream_unref(pa_stream *s) {
143 assert(s);
144 assert(s->ref >= 1);
145
146 if (--(s->ref) == 0)
147 stream_free(s);
148 }
149
150 pa_stream* pa_stream_ref(pa_stream *s) {
151 assert(s);
152 assert(s->ref >= 1);
153
154 s->ref++;
155 return s;
156 }
157
158 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
159 assert(s);
160 assert(s->ref >= 1);
161
162 return s->state;
163 }
164
165 pa_context* pa_stream_get_context(pa_stream *s) {
166 assert(s);
167 assert(s->ref >= 1);
168
169 return s->context;
170 }
171
172 uint32_t pa_stream_get_index(pa_stream *s) {
173 assert(s);
174 assert(s->ref >= 1);
175
176 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
177
178 return s->device_index;
179 }
180
181 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
182 assert(s);
183 assert(s->ref >= 1);
184
185 if (s->state == st)
186 return;
187
188 pa_stream_ref(s);
189
190 s->state = st;
191
192 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
193 /* Detach from context */
194
195 if (s->channel_valid)
196 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
197
198 s->channel = 0;
199 s->channel_valid = 0;
200
201 /* We keep a ref as long as we're connected */
202 pa_stream_unref(s);
203 }
204
205 if (s->state_callback)
206 s->state_callback(s, s->state_userdata);
207
208 pa_stream_unref(s);
209 }
210
211 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
212 pa_context *c = userdata;
213 pa_stream *s;
214 uint32_t channel;
215
216 assert(pd);
217 assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
218 assert(t);
219 assert(c);
220
221 pa_context_ref(c);
222
223 if (pa_tagstruct_getu32(t, &channel) < 0 ||
224 !pa_tagstruct_eof(t)) {
225 pa_context_fail(c, PA_ERR_PROTOCOL);
226 goto finish;
227 }
228
229 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
230 goto finish;
231
232 pa_context_set_error(c, PA_ERR_KILLED);
233 pa_stream_set_state(s, PA_STREAM_FAILED);
234
235 finish:
236 pa_context_unref(c);
237 }
238
239 void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
240 pa_stream *s;
241 pa_context *c = userdata;
242 uint32_t bytes, channel;
243
244 assert(pd);
245 assert(command == PA_COMMAND_REQUEST);
246 assert(t);
247 assert(c);
248
249 pa_context_ref(c);
250
251 if (pa_tagstruct_getu32(t, &channel) < 0 ||
252 pa_tagstruct_getu32(t, &bytes) < 0 ||
253 !pa_tagstruct_eof(t)) {
254 pa_context_fail(c, PA_ERR_PROTOCOL);
255 goto finish;
256 }
257
258 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
259 goto finish;
260
261 if (s->state == PA_STREAM_READY) {
262 s->requested_bytes += bytes;
263
264 if (s->requested_bytes > 0 && s->write_callback)
265 s->write_callback(s, s->requested_bytes, s->write_userdata);
266 }
267
268 finish:
269 pa_context_unref(c);
270 }
271
272 void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
273 pa_stream *s;
274 pa_context *c = userdata;
275 uint32_t channel;
276
277 assert(pd);
278 assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
279 assert(t);
280 assert(c);
281
282 pa_context_ref(c);
283
284 if (pa_tagstruct_getu32(t, &channel) < 0 ||
285 !pa_tagstruct_eof(t)) {
286 pa_context_fail(c, PA_ERR_PROTOCOL);
287 goto finish;
288 }
289
290 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
291 goto finish;
292
293 if (s->state == PA_STREAM_READY) {
294
295 if (command == PA_COMMAND_OVERFLOW) {
296 if (s->overflow_callback)
297 s->overflow_callback(s, s->overflow_userdata);
298 } else if (command == PA_COMMAND_UNDERFLOW) {
299 if (s->underflow_callback)
300 s->underflow_callback(s, s->underflow_userdata);
301 }
302 }
303
304 finish:
305 pa_context_unref(c);
306 }
307
308 static void ipol_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
309 struct timeval next;
310 pa_stream *s = userdata;
311
312 pa_stream_ref(s);
313
314 /* pa_log("requesting new ipol data"); */
315
316 if (s->state == PA_STREAM_READY && !s->ipol_requested) {
317 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
318 s->ipol_requested = 1;
319 }
320
321 pa_gettimeofday(&next);
322 pa_timeval_add(&next, LATENCY_IPOL_INTERVAL_USEC);
323 m->time_restart(e, &next);
324
325 pa_stream_unref(s);
326 }
327
328
329 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
330 pa_stream *s = userdata;
331
332 assert(pd);
333 assert(t);
334 assert(s);
335 assert(s->state == PA_STREAM_CREATING);
336
337 pa_stream_ref(s);
338
339 if (command != PA_COMMAND_REPLY) {
340 if (pa_context_handle_error(s->context, command, t) < 0)
341 goto finish;
342
343 pa_stream_set_state(s, PA_STREAM_FAILED);
344 goto finish;
345 }
346
347 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
348 ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
349 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
350 !pa_tagstruct_eof(t)) {
351 pa_context_fail(s->context, PA_ERR_PROTOCOL);
352 goto finish;
353 }
354
355 if (s->direction == PA_STREAM_RECORD) {
356 assert(!s->record_memblockq);
357
358 s->record_memblockq = pa_memblockq_new(
359 0,
360 s->buffer_attr.maxlength,
361 0,
362 pa_frame_size(&s->sample_spec),
363 1,
364 0,
365 NULL,
366 s->context->memblock_stat);
367 }
368
369 s->channel_valid = 1;
370 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
371
372 /* We add an extra ref as long as we're connected (i.e. in the dynarray) */
373 pa_stream_ref(s);
374
375 if (s->interpolate) {
376 struct timeval tv;
377 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
378
379 pa_gettimeofday(&tv);
380 tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
381
382 assert(!s->ipol_event);
383 s->ipol_event = s->mainloop->time_new(s->mainloop, &tv, &ipol_callback, s);
384 }
385
386 pa_stream_set_state(s, PA_STREAM_READY);
387
388 if (s->requested_bytes > 0 && s->ref > 1 && s->write_callback)
389 s->write_callback(s, s->requested_bytes, s->write_userdata);
390
391 finish:
392 pa_stream_unref(s);
393 }
394
395 static int create_stream(
396 pa_stream_direction_t direction,
397 pa_stream *s,
398 const char *dev,
399 const pa_buffer_attr *attr,
400 pa_stream_flags_t flags,
401 const pa_cvolume *volume,
402 pa_stream *sync_stream) {
403
404 pa_tagstruct *t;
405 uint32_t tag;
406
407 assert(s);
408 assert(s->ref >= 1);
409
410 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
411 PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)), PA_ERR_INVALID);
412 PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID);
413 PA_CHECK_VALIDITY(s->context, !volume || volume->channels == s->sample_spec.channels, PA_ERR_INVALID);
414 PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
415
416 pa_stream_ref(s);
417
418 s->direction = direction;
419
420 if (sync_stream)
421 s->syncid = sync_stream->syncid;
422
423 s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY);
424 pa_stream_trash_ipol(s);
425
426 if (attr)
427 s->buffer_attr = *attr;
428 else {
429 /* half a second */
430 s->buffer_attr.tlength = pa_bytes_per_second(&s->sample_spec)/2;
431 s->buffer_attr.maxlength = (s->buffer_attr.tlength*3)/2;
432 s->buffer_attr.minreq = s->buffer_attr.tlength/100;
433 s->buffer_attr.prebuf = s->buffer_attr.tlength - s->buffer_attr.minreq;
434 s->buffer_attr.fragsize = s->buffer_attr.tlength/100;
435 }
436
437 if (!dev)
438 dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
439
440 t = pa_tagstruct_command(
441 s->context,
442 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
443 &tag);
444
445 pa_tagstruct_put(
446 t,
447 PA_TAG_STRING, s->name,
448 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
449 PA_TAG_CHANNEL_MAP, &s->channel_map,
450 PA_TAG_U32, PA_INVALID_INDEX,
451 PA_TAG_STRING, dev,
452 PA_TAG_U32, s->buffer_attr.maxlength,
453 PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
454 PA_TAG_INVALID);
455
456 if (s->direction == PA_STREAM_PLAYBACK) {
457 pa_cvolume cv;
458
459 pa_tagstruct_put(
460 t,
461 PA_TAG_U32, s->buffer_attr.tlength,
462 PA_TAG_U32, s->buffer_attr.prebuf,
463 PA_TAG_U32, s->buffer_attr.minreq,
464 PA_TAG_U32, s->syncid,
465 PA_TAG_INVALID);
466
467 if (!volume)
468 volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
469
470 pa_tagstruct_put_cvolume(t, volume);
471 } else
472 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
473
474 pa_pstream_send_tagstruct(s->context->pstream, t);
475 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s);
476
477 pa_stream_set_state(s, PA_STREAM_CREATING);
478
479 pa_stream_unref(s);
480 return 0;
481 }
482
483 int pa_stream_connect_playback(
484 pa_stream *s,
485 const char *dev,
486 const pa_buffer_attr *attr,
487 pa_stream_flags_t flags,
488 pa_cvolume *volume,
489 pa_stream *sync_stream) {
490
491 assert(s);
492 assert(s->ref >= 1);
493
494 return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
495 }
496
497 int pa_stream_connect_record(
498 pa_stream *s,
499 const char *dev,
500 const pa_buffer_attr *attr,
501 pa_stream_flags_t flags) {
502
503 assert(s);
504 assert(s->ref >= 1);
505
506 return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
507 }
508
509 int pa_stream_write(
510 pa_stream *s,
511 const void *data,
512 size_t length,
513 void (*free_cb)(void *p),
514 int64_t offset,
515 pa_seek_mode_t seek) {
516
517 pa_memchunk chunk;
518
519 assert(s);
520 assert(s->ref >= 1);
521 assert(data);
522
523 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
524 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
525 PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
526 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
527
528 if (length <= 0)
529 return 0;
530
531 if (free_cb)
532 chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
533 else {
534 chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
535 memcpy(chunk.memblock->data, data, length);
536 }
537
538 chunk.index = 0;
539 chunk.length = length;
540
541 pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
542 pa_memblock_unref(chunk.memblock);
543
544 if (length < s->requested_bytes)
545 s->requested_bytes -= length;
546 else
547 s->requested_bytes = 0;
548
549 s->counter += length;
550 return 0;
551 }
552
553 int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
554 assert(s);
555 assert(s->ref >= 1);
556 assert(data);
557 assert(length);
558
559 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
560 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
561
562 if (!s->peek_memchunk.memblock) {
563
564 if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
565 *data = NULL;
566 *length = 0;
567 return 0;
568 }
569 }
570
571 *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
572 *length = s->peek_memchunk.length;
573 return 0;
574 }
575
576 int pa_stream_drop(pa_stream *s) {
577 assert(s);
578 assert(s->ref >= 1);
579
580 PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
581 PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
582 PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
583
584 pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
585
586 pa_memblock_unref(s->peek_memchunk.memblock);
587 s->peek_memchunk.length = 0;
588 s->peek_memchunk.index = 0;
589 s->peek_memchunk.memblock = NULL;
590
591 s->counter += s->peek_memchunk.length;
592 return 0;
593 }
594
595 size_t pa_stream_writable_size(pa_stream *s) {
596 assert(s);
597 assert(s->ref >= 1);
598
599 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
600 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1);
601
602 return s->requested_bytes;
603 }
604
605 size_t pa_stream_readable_size(pa_stream *s) {
606 assert(s);
607 assert(s->ref >= 1);
608
609 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
610 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
611
612 return pa_memblockq_get_length(s->record_memblockq);
613 }
614
615 pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
616 pa_operation *o;
617 pa_tagstruct *t;
618 uint32_t tag;
619
620 assert(s);
621 assert(s->ref >= 1);
622
623 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
624 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
625
626 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
627
628 t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
629 pa_tagstruct_putu32(t, s->channel);
630 pa_pstream_send_tagstruct(s->context->pstream, t);
631 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
632
633 return pa_operation_ref(o);
634 }
635
636 static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
637 pa_operation *o = userdata;
638 pa_latency_info i, *p = NULL;
639 struct timeval local, remote, now;
640
641 assert(pd);
642 assert(o);
643 assert(o->stream);
644 assert(o->context);
645
646 i.counter = *(uint64_t*)pa_hashmap_get(o->stream->counter_hashmap, (void*)(unsigned long)tag);
647 pa_xfree(pa_hashmap_remove(o->stream->counter_hashmap, (void*)(unsigned long)tag));
648
649 if (command != PA_COMMAND_REPLY) {
650 if (pa_context_handle_error(o->context, command, t) < 0)
651 goto finish;
652
653 } else if (pa_tagstruct_get_usec(t, &i.buffer_usec) < 0 ||
654 pa_tagstruct_get_usec(t, &i.sink_usec) < 0 ||
655 pa_tagstruct_get_usec(t, &i.source_usec) < 0 ||
656 pa_tagstruct_get_boolean(t, &i.playing) < 0 ||
657 pa_tagstruct_getu32(t, &i.queue_length) < 0 ||
658 pa_tagstruct_get_timeval(t, &local) < 0 ||
659 pa_tagstruct_get_timeval(t, &remote) < 0 ||
660 pa_tagstruct_gets64(t, &i.write_index) < 0 ||
661 pa_tagstruct_gets64(t, &i.read_index) < 0 ||
662 !pa_tagstruct_eof(t)) {
663 pa_context_fail(o->context, PA_ERR_PROTOCOL);
664 goto finish;
665 } else {
666 pa_gettimeofday(&now);
667
668 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
669 /* local and remote seem to have synchronized clocks */
670
671 if (o->stream->direction == PA_STREAM_PLAYBACK)
672 i.transport_usec = pa_timeval_diff(&remote, &local);
673 else
674 i.transport_usec = pa_timeval_diff(&now, &remote);
675
676 i.synchronized_clocks = 1;
677 i.timestamp = remote;
678 } else {
679 /* clocks are not synchronized, let's estimate latency then */
680 i.transport_usec = pa_timeval_diff(&now, &local)/2;
681 i.synchronized_clocks = 0;
682 i.timestamp = local;
683 pa_timeval_add(&i.timestamp, i.transport_usec);
684 }
685
686 if (o->stream->interpolate) {
687 /* pa_log("new interpol data"); */
688 o->stream->ipol_timestamp = i.timestamp;
689 o->stream->ipol_usec = pa_stream_get_time(o->stream, &i);
690 o->stream->ipol_requested = 0;
691 }
692
693 p = &i;
694 }
695
696 if (o->callback) {
697 pa_stream_get_latency_info_cb_t cb = (pa_stream_get_latency_info_cb_t) o->callback;
698 cb(o->stream, p, o->userdata);
699 }
700
701 finish:
702 pa_operation_done(o);
703 pa_operation_unref(o);
704 }
705
706 pa_operation* pa_stream_get_latency_info(pa_stream *s, pa_stream_get_latency_info_cb_t cb, void *userdata) {
707 uint32_t tag;
708 pa_operation *o;
709 pa_tagstruct *t;
710 struct timeval now;
711 uint64_t *counter;
712
713 assert(s);
714 assert(s->ref >= 1);
715
716 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
717 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
718 PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_hashmap_size(s->counter_hashmap) < COUNTER_HASHMAP_MAXSIZE, PA_ERR_INTERNAL);
719
720 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
721
722 t = pa_tagstruct_command(
723 s->context,
724 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY,
725 &tag);
726 pa_tagstruct_putu32(t, s->channel);
727
728 pa_gettimeofday(&now);
729 pa_tagstruct_put_timeval(t, &now);
730
731 pa_pstream_send_tagstruct(s->context->pstream, t);
732 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_info_callback, o);
733
734 counter = pa_xmalloc(sizeof(uint64_t));
735 *counter = s->counter;
736 pa_hashmap_put(s->counter_hashmap, (void*)(unsigned long)tag, counter);
737
738 return pa_operation_ref(o);
739 }
740
741 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
742 pa_stream *s = userdata;
743
744 assert(pd);
745 assert(s);
746 assert(s->ref >= 1);
747
748 pa_stream_ref(s);
749
750 if (command != PA_COMMAND_REPLY) {
751 if (pa_context_handle_error(s->context, command, t) < 0)
752 goto finish;
753
754 pa_stream_set_state(s, PA_STREAM_FAILED);
755 goto finish;
756 } else if (!pa_tagstruct_eof(t)) {
757 pa_context_fail(s->context, PA_ERR_PROTOCOL);
758 goto finish;
759 }
760
761 pa_stream_set_state(s, PA_STREAM_TERMINATED);
762
763 finish:
764 pa_stream_unref(s);
765 }
766
767 int pa_stream_disconnect(pa_stream *s) {
768 pa_tagstruct *t;
769 uint32_t tag;
770
771 assert(s);
772 assert(s->ref >= 1);
773
774 PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
775 PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
776
777 pa_stream_ref(s);
778
779 t = pa_tagstruct_command(
780 s->context,
781 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
782 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM),
783 &tag);
784 pa_tagstruct_putu32(t, s->channel);
785 pa_pstream_send_tagstruct(s->context->pstream, t);
786 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s);
787
788 pa_stream_unref(s);
789 return 0;
790 }
791
792 void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
793 assert(s);
794 assert(s->ref >= 1);
795
796 s->read_callback = cb;
797 s->read_userdata = userdata;
798 }
799
800 void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
801 assert(s);
802 assert(s->ref >= 1);
803
804 s->write_callback = cb;
805 s->write_userdata = userdata;
806 }
807
808 void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
809 assert(s);
810 assert(s->ref >= 1);
811
812 s->state_callback = cb;
813 s->state_userdata = userdata;
814 }
815
816 void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
817 assert(s);
818 assert(s->ref >= 1);
819
820 s->overflow_callback = cb;
821 s->overflow_userdata = userdata;
822 }
823
824 void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
825 assert(s);
826 assert(s->ref >= 1);
827
828 s->underflow_callback = cb;
829 s->underflow_userdata = userdata;
830 }
831
832 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
833 pa_operation *o = userdata;
834 int success = 1;
835
836 assert(pd);
837 assert(o);
838 assert(o->context);
839 assert(o->ref >= 1);
840
841 if (command != PA_COMMAND_REPLY) {
842 if (pa_context_handle_error(o->context, command, t) < 0)
843 goto finish;
844
845 success = 0;
846 } else if (!pa_tagstruct_eof(t)) {
847 pa_context_fail(o->context, PA_ERR_PROTOCOL);
848 goto finish;
849 }
850
851 if (o->callback) {
852 pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
853 cb(o->stream, success, o->userdata);
854 }
855
856 finish:
857 pa_operation_done(o);
858 pa_operation_unref(o);
859 }
860
861 pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
862 pa_operation *o;
863 pa_tagstruct *t;
864 uint32_t tag;
865
866 assert(s);
867 assert(s->ref >= 1);
868
869 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
870 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
871
872 if (s->interpolate) {
873 if (!s->corked && b)
874 /* Pausing */
875 s->ipol_usec = pa_stream_get_interpolated_time(s);
876 else if (s->corked && !b)
877 /* Unpausing */
878 pa_gettimeofday(&s->ipol_timestamp);
879 }
880
881 s->corked = b;
882
883 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
884
885 t = pa_tagstruct_command(
886 s->context,
887 s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM,
888 &tag);
889 pa_tagstruct_putu32(t, s->channel);
890 pa_tagstruct_put_boolean(t, !!b);
891 pa_pstream_send_tagstruct(s->context->pstream, t);
892 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
893
894 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
895
896 return pa_operation_ref(o);
897 }
898
899 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
900 pa_tagstruct *t;
901 pa_operation *o;
902 uint32_t tag;
903
904 assert(s);
905 assert(s->ref >= 1);
906
907 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
908
909 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
910
911 t = pa_tagstruct_command(s->context, command, &tag);
912 pa_tagstruct_putu32(t, s->channel);
913 pa_pstream_send_tagstruct(s->context->pstream, t);
914 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
915
916 return pa_operation_ref(o);
917 }
918
919 pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
920 pa_operation *o;
921
922 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
923
924 if ((o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata)))
925 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
926
927 return o;
928 }
929
930 pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
931 pa_operation *o;
932
933 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
934
935 if ((o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
936 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
937
938 return o;
939 }
940
941 pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
942 pa_operation *o;
943
944 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
945
946 if ((o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
947 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
948
949 return o;
950 }
951
952 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
953 pa_operation *o;
954 pa_tagstruct *t;
955 uint32_t tag;
956
957 assert(s);
958 assert(s->ref >= 1);
959 assert(name);
960
961 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
962 PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
963
964 o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
965
966 t = pa_tagstruct_command(
967 s->context,
968 s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME,
969 &tag);
970 pa_tagstruct_putu32(t, s->channel);
971 pa_tagstruct_puts(t, name);
972 pa_pstream_send_tagstruct(s->context->pstream, t);
973 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
974
975 return pa_operation_ref(o);
976 }
977
978 uint64_t pa_stream_get_counter(pa_stream *s) {
979 assert(s);
980 assert(s->ref >= 1);
981
982 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (uint64_t) -1);
983
984 return s->counter;
985 }
986
987 pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i) {
988 pa_usec_t usec;
989
990 assert(s);
991 assert(s->ref >= 1);
992
993 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1);
994 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1);
995
996 usec = pa_bytes_to_usec(i->counter, &s->sample_spec);
997
998 if (i) {
999 if (s->direction == PA_STREAM_PLAYBACK) {
1000 pa_usec_t latency = i->transport_usec + i->buffer_usec + i->sink_usec;
1001 if (usec < latency)
1002 usec = 0;
1003 else
1004 usec -= latency;
1005
1006 } else if (s->direction == PA_STREAM_RECORD) {
1007 usec += i->source_usec + i->buffer_usec + i->transport_usec;
1008
1009 if (usec > i->sink_usec)
1010 usec -= i->sink_usec;
1011 else
1012 usec = 0;
1013 }
1014 }
1015
1016 if (usec < s->previous_time)
1017 usec = s->previous_time;
1018
1019 s->previous_time = usec;
1020
1021 return usec;
1022 }
1023
1024 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t t, pa_usec_t c, int *negative) {
1025 assert(s);
1026 assert(s->ref >= 1);
1027
1028 if (negative)
1029 *negative = 0;
1030
1031 if (c < t) {
1032 if (s->direction == PA_STREAM_RECORD) {
1033 if (negative)
1034 *negative = 1;
1035
1036 return t-c;
1037 } else
1038 return 0;
1039 } else
1040 return c-t;
1041 }
1042
1043 pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative) {
1044 pa_usec_t t, c;
1045
1046 assert(s);
1047 assert(s->ref >= 1);
1048 assert(i);
1049
1050 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1);
1051 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1);
1052
1053 t = pa_stream_get_time(s, i);
1054 c = pa_bytes_to_usec(s->counter, &s->sample_spec);
1055
1056 return time_counter_diff(s, t, c, negative);
1057 }
1058
1059 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
1060 assert(s);
1061 assert(s->ref >= 1);
1062
1063 return &s->sample_spec;
1064 }
1065
1066 const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
1067 assert(s);
1068 assert(s->ref >= 1);
1069
1070 return &s->channel_map;
1071 }
1072
1073 void pa_stream_trash_ipol(pa_stream *s) {
1074 assert(s);
1075 assert(s->ref >= 1);
1076
1077 if (!s->interpolate)
1078 return;
1079
1080 memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp));
1081 s->ipol_usec = 0;
1082 }
1083
1084 pa_usec_t pa_stream_get_interpolated_time(pa_stream *s) {
1085 pa_usec_t usec;
1086
1087 assert(s);
1088 assert(s->ref >= 1);
1089
1090 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1);
1091 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1);
1092 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1);
1093
1094 if (s->corked)
1095 usec = s->ipol_usec;
1096 else {
1097 if (s->ipol_timestamp.tv_sec == 0)
1098 usec = 0;
1099 else
1100 usec = s->ipol_usec + pa_timeval_age(&s->ipol_timestamp);
1101 }
1102
1103 if (usec < s->previous_ipol_time)
1104 usec = s->previous_ipol_time;
1105
1106 s->previous_ipol_time = usec;
1107
1108 return usec;
1109 }
1110
1111 pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative) {
1112 pa_usec_t t, c;
1113
1114 assert(s);
1115 assert(s->ref >= 1);
1116
1117 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (pa_usec_t) -1);
1118 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, (pa_usec_t) -1);
1119 PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->interpolate, PA_ERR_BADSTATE, (pa_usec_t) -1);
1120
1121 t = pa_stream_get_interpolated_time(s);
1122 c = pa_bytes_to_usec(s->counter, &s->sample_spec);
1123 return time_counter_diff(s, t, c, negative);
1124 }