]> code.delx.au - pulseaudio/blob - polyp/polyplib-stream.c
7b9d6863d507b2f79f1486da325679636da4d31d
[pulseaudio] / polyp / polyplib-stream.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <string.h>
28 #include <stdio.h>
29 #include <string.h>
30
31 #include "polyplib-internal.h"
32 #include "xmalloc.h"
33 #include "pstream-util.h"
34 #include "util.h"
35 #include "log.h"
36
37 #define LATENCY_IPOL_INTERVAL_USEC (10000L)
38
39 pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
40 pa_stream *s;
41 assert(c);
42 assert(ss);
43
44 if (!pa_sample_spec_valid(ss))
45 return NULL;
46
47 if (map && !pa_channel_map_valid(map))
48 return NULL;
49
50 s = pa_xnew(pa_stream, 1);
51 s->ref = 1;
52 s->context = c;
53 s->mainloop = c->mainloop;
54
55 s->read_callback = NULL;
56 s->read_userdata = NULL;
57 s->write_callback = NULL;
58 s->write_userdata = NULL;
59 s->state_callback = NULL;
60 s->state_userdata = NULL;
61
62 s->direction = PA_STREAM_NODIRECTION;
63 s->name = pa_xstrdup(name);
64 s->sample_spec = *ss;
65
66 if (map)
67 s->channel_map = *map;
68 else
69 pa_channel_map_init_auto(&s->channel_map, ss->channels);
70
71 s->channel = 0;
72 s->channel_valid = 0;
73 s->device_index = PA_INVALID_INDEX;
74 s->requested_bytes = 0;
75 s->state = PA_STREAM_DISCONNECTED;
76 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
77
78 s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat);
79
80 s->counter = 0;
81 s->previous_time = 0;
82 s->previous_ipol_time = 0;
83
84 s->corked = 0;
85 s->interpolate = 0;
86
87 s->ipol_usec = 0;
88 memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp));
89 s->ipol_event = NULL;
90 s->ipol_requested = 0;
91
92 PA_LLIST_PREPEND(pa_stream, c->streams, s);
93
94 return pa_stream_ref(s);
95 }
96
97 static void stream_free(pa_stream *s) {
98 assert(s);
99
100 if (s->ipol_event) {
101 assert(s->mainloop);
102 s->mainloop->time_free(s->ipol_event);
103 }
104
105 pa_mcalign_free(s->mcalign);
106
107 pa_xfree(s->name);
108 pa_xfree(s);
109 }
110
111 void pa_stream_unref(pa_stream *s) {
112 assert(s && s->ref >= 1);
113
114 if (--(s->ref) == 0)
115 stream_free(s);
116 }
117
118 pa_stream* pa_stream_ref(pa_stream *s) {
119 assert(s && s->ref >= 1);
120 s->ref++;
121 return s;
122 }
123
124 pa_stream_state_t pa_stream_get_state(pa_stream *s) {
125 assert(s && s->ref >= 1);
126 return s->state;
127 }
128
129 pa_context* pa_stream_get_context(pa_stream *s) {
130 assert(s && s->ref >= 1);
131 return s->context;
132 }
133
134 uint32_t pa_stream_get_index(pa_stream *s) {
135 assert(s && s->ref >= 1);
136 return s->device_index;
137 }
138
139 void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
140 assert(s && s->ref >= 1);
141
142 if (s->state == st)
143 return;
144
145 pa_stream_ref(s);
146
147 s->state = st;
148
149 if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
150 if (s->channel_valid)
151 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
152
153 PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
154 pa_stream_unref(s);
155 }
156
157 if (s->state_callback)
158 s->state_callback(s, s->state_userdata);
159
160 pa_stream_unref(s);
161 }
162
163 void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
164 pa_context *c = userdata;
165 pa_stream *s;
166 uint32_t channel;
167 assert(pd && (command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED) && t && c);
168
169 pa_context_ref(c);
170
171 if (pa_tagstruct_getu32(t, &channel) < 0 ||
172 !pa_tagstruct_eof(t)) {
173 pa_context_fail(c, PA_ERROR_PROTOCOL);
174 goto finish;
175 }
176
177 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
178 goto finish;
179
180 c->error = PA_ERROR_KILLED;
181 pa_stream_set_state(s, PA_STREAM_FAILED);
182
183 finish:
184 pa_context_unref(c);
185 }
186
187 void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
188 pa_stream *s;
189 pa_context *c = userdata;
190 uint32_t bytes, channel;
191 assert(pd && command == PA_COMMAND_REQUEST && t && c);
192
193 pa_context_ref(c);
194
195 if (pa_tagstruct_getu32(t, &channel) < 0 ||
196 pa_tagstruct_getu32(t, &bytes) < 0 ||
197 !pa_tagstruct_eof(t)) {
198 pa_context_fail(c, PA_ERROR_PROTOCOL);
199 goto finish;
200 }
201
202 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
203 goto finish;
204
205 if (s->state != PA_STREAM_READY)
206 goto finish;
207
208 pa_stream_ref(s);
209
210 s->requested_bytes += bytes;
211
212 if (s->requested_bytes && s->write_callback)
213 s->write_callback(s, s->requested_bytes, s->write_userdata);
214
215 pa_stream_unref(s);
216
217 finish:
218 pa_context_unref(c);
219 }
220
221 static void ipol_callback(pa_mainloop_api *m, pa_time_event *e, PA_GCC_UNUSED const struct timeval *tv, void *userdata) {
222 struct timeval tv2;
223 pa_stream *s = userdata;
224
225 pa_stream_ref(s);
226
227 /* pa_log("requesting new ipol data\n"); */
228
229 if (s->state == PA_STREAM_READY && !s->ipol_requested) {
230 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
231 s->ipol_requested = 1;
232 }
233
234 pa_gettimeofday(&tv2);
235 pa_timeval_add(&tv2, LATENCY_IPOL_INTERVAL_USEC);
236
237 m->time_restart(e, &tv2);
238
239 pa_stream_unref(s);
240 }
241
242
243 void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
244 pa_stream *s = userdata;
245 assert(pd && s && s->state == PA_STREAM_CREATING);
246
247 pa_stream_ref(s);
248
249 if (command != PA_COMMAND_REPLY) {
250 if (pa_context_handle_error(s->context, command, t) < 0)
251 goto finish;
252
253 pa_stream_set_state(s, PA_STREAM_FAILED);
254 goto finish;
255 }
256
257 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
258 ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
259 ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
260 !pa_tagstruct_eof(t)) {
261 pa_context_fail(s->context, PA_ERROR_PROTOCOL);
262 goto finish;
263 }
264
265 s->channel_valid = 1;
266 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
267 pa_stream_set_state(s, PA_STREAM_READY);
268
269 if (s->interpolate) {
270 struct timeval tv;
271 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
272
273 pa_gettimeofday(&tv);
274 tv.tv_usec += LATENCY_IPOL_INTERVAL_USEC; /* every 100 ms */
275
276 assert(!s->ipol_event);
277 s->ipol_event = s->mainloop->time_new(s->mainloop, &tv, &ipol_callback, s);
278 }
279
280 if (s->requested_bytes && s->ref > 1 && s->write_callback)
281 s->write_callback(s, s->requested_bytes, s->write_userdata);
282
283 finish:
284 pa_stream_unref(s);
285 }
286
287 static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) {
288 pa_tagstruct *t;
289 uint32_t tag;
290 assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED);
291
292 pa_stream_ref(s);
293
294 s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY);
295 pa_stream_trash_ipol(s);
296
297 if (attr)
298 s->buffer_attr = *attr;
299 else {
300 /* half a second */
301 s->buffer_attr.tlength = pa_bytes_per_second(&s->sample_spec)/2;
302 s->buffer_attr.maxlength = (s->buffer_attr.tlength*3)/2;
303 s->buffer_attr.minreq = s->buffer_attr.tlength/100;
304 s->buffer_attr.prebuf = s->buffer_attr.tlength - s->buffer_attr.minreq;
305 s->buffer_attr.fragsize = s->buffer_attr.tlength/100;
306 }
307
308 pa_stream_set_state(s, PA_STREAM_CREATING);
309
310 t = pa_tagstruct_new(NULL, 0);
311 assert(t);
312
313 if (!dev) {
314 if (s->direction == PA_STREAM_PLAYBACK)
315 dev = s->context->conf->default_sink;
316 else
317 dev = s->context->conf->default_source;
318 }
319
320 pa_tagstruct_put(t,
321 PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
322 PA_TAG_U32, tag = s->context->ctag++,
323 PA_TAG_STRING, s->name,
324 PA_TAG_SAMPLE_SPEC, &s->sample_spec,
325 PA_TAG_CHANNEL_MAP, &s->channel_map,
326 PA_TAG_U32, PA_INVALID_INDEX,
327 PA_TAG_STRING, dev,
328 PA_TAG_U32, s->buffer_attr.maxlength,
329 PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
330 PA_TAG_INVALID);
331
332 if (s->direction == PA_STREAM_PLAYBACK) {
333 pa_cvolume cv;
334 pa_tagstruct_put(t,
335 PA_TAG_U32, s->buffer_attr.tlength,
336 PA_TAG_U32, s->buffer_attr.prebuf,
337 PA_TAG_U32, s->buffer_attr.minreq,
338 PA_TAG_INVALID);
339
340 if (!volume) {
341 pa_cvolume_reset(&cv, s->sample_spec.channels);
342 volume = &cv;
343 }
344
345 pa_tagstruct_put_cvolume(t, volume);
346 } else
347 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
348
349 pa_pstream_send_tagstruct(s->context->pstream, t);
350 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s);
351
352 pa_stream_unref(s);
353 }
354
355 void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) {
356 assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
357 s->direction = PA_STREAM_PLAYBACK;
358 create_stream(s, dev, attr, flags, volume);
359 }
360
361 void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) {
362 assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
363 s->direction = PA_STREAM_RECORD;
364 create_stream(s, dev, attr, flags, 0);
365 }
366
367 void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
368 pa_memchunk chunk;
369 assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
370
371 if (free_cb) {
372 chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
373 assert(chunk.memblock && chunk.memblock->data);
374 } else {
375 chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
376 assert(chunk.memblock && chunk.memblock->data);
377 memcpy(chunk.memblock->data, data, length);
378 }
379 chunk.index = 0;
380 chunk.length = length;
381
382 pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
383 pa_memblock_unref(chunk.memblock);
384
385 if (length < s->requested_bytes)
386 s->requested_bytes -= length;
387 else
388 s->requested_bytes = 0;
389
390 s->counter += length;
391 }
392
393 size_t pa_stream_writable_size(pa_stream *s) {
394 assert(s && s->ref >= 1);
395 return s->state == PA_STREAM_READY ? s->requested_bytes : 0;
396 }
397
398 pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
399 pa_operation *o;
400 pa_tagstruct *t;
401 uint32_t tag;
402 assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
403
404 o = pa_operation_new(s->context, s);
405 assert(o);
406 o->callback = (pa_operation_callback) cb;
407 o->userdata = userdata;
408
409 t = pa_tagstruct_new(NULL, 0);
410 assert(t);
411 pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
412 pa_tagstruct_putu32(t, tag = s->context->ctag++);
413 pa_tagstruct_putu32(t, s->channel);
414 pa_pstream_send_tagstruct(s->context->pstream, t);
415 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
416
417 return pa_operation_ref(o);
418 }
419
420 static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
421 pa_operation *o = userdata;
422 pa_latency_info i, *p = NULL;
423 struct timeval local, remote, now;
424 assert(pd && o && o->stream && o->context);
425
426 if (command != PA_COMMAND_REPLY) {
427 if (pa_context_handle_error(o->context, command, t) < 0)
428 goto finish;
429
430 } else if (pa_tagstruct_get_usec(t, &i.buffer_usec) < 0 ||
431 pa_tagstruct_get_usec(t, &i.sink_usec) < 0 ||
432 pa_tagstruct_get_usec(t, &i.source_usec) < 0 ||
433 pa_tagstruct_get_boolean(t, &i.playing) < 0 ||
434 pa_tagstruct_getu32(t, &i.queue_length) < 0 ||
435 pa_tagstruct_get_timeval(t, &local) < 0 ||
436 pa_tagstruct_get_timeval(t, &remote) < 0 ||
437 pa_tagstruct_getu64(t, &i.counter) < 0 ||
438 !pa_tagstruct_eof(t)) {
439 pa_context_fail(o->context, PA_ERROR_PROTOCOL);
440 goto finish;
441 } else {
442 pa_gettimeofday(&now);
443
444 if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
445 /* local and remote seem to have synchronized clocks */
446
447 if (o->stream->direction == PA_STREAM_PLAYBACK)
448 i.transport_usec = pa_timeval_diff(&remote, &local);
449 else
450 i.transport_usec = pa_timeval_diff(&now, &remote);
451
452 i.synchronized_clocks = 1;
453 i.timestamp = remote;
454 } else {
455 /* clocks are not synchronized, let's estimate latency then */
456 i.transport_usec = pa_timeval_diff(&now, &local)/2;
457 i.synchronized_clocks = 0;
458 i.timestamp = local;
459 pa_timeval_add(&i.timestamp, i.transport_usec);
460 }
461
462 if (o->stream->interpolate) {
463 /* pa_log("new interpol data\n"); */
464 o->stream->ipol_timestamp = i.timestamp;
465 o->stream->ipol_usec = pa_stream_get_time(o->stream, &i);
466 o->stream->ipol_requested = 0;
467 }
468
469 p = &i;
470 }
471
472 if (o->callback) {
473 void (*cb)(pa_stream *s, const pa_latency_info *_i, void *_userdata) = (void (*)(pa_stream *s, const pa_latency_info *_i, void *_userdata)) o->callback;
474 cb(o->stream, p, o->userdata);
475 }
476
477 finish:
478 pa_operation_done(o);
479 pa_operation_unref(o);
480 }
481
482 pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p, const pa_latency_info*i, void *userdata), void *userdata) {
483 uint32_t tag;
484 pa_operation *o;
485 pa_tagstruct *t;
486 struct timeval now;
487 assert(s && s->direction != PA_STREAM_UPLOAD);
488
489 o = pa_operation_new(s->context, s);
490 assert(o);
491 o->callback = (pa_operation_callback) cb;
492 o->userdata = userdata;
493
494 t = pa_tagstruct_new(NULL, 0);
495 assert(t);
496 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY);
497 pa_tagstruct_putu32(t, tag = s->context->ctag++);
498 pa_tagstruct_putu32(t, s->channel);
499
500 pa_gettimeofday(&now);
501 pa_tagstruct_put_timeval(t, &now);
502 pa_tagstruct_putu64(t, s->counter);
503
504 pa_pstream_send_tagstruct(s->context->pstream, t);
505 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_info_callback, o);
506
507 return pa_operation_ref(o);
508 }
509
510 void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
511 pa_stream *s = userdata;
512 assert(pd && s && s->ref >= 1);
513
514 pa_stream_ref(s);
515
516 if (command != PA_COMMAND_REPLY) {
517 if (pa_context_handle_error(s->context, command, t) < 0)
518 goto finish;
519
520 pa_stream_set_state(s, PA_STREAM_FAILED);
521 goto finish;
522 } else if (!pa_tagstruct_eof(t)) {
523 pa_context_fail(s->context, PA_ERROR_PROTOCOL);
524 goto finish;
525 }
526
527 pa_stream_set_state(s, PA_STREAM_TERMINATED);
528
529 finish:
530 pa_stream_unref(s);
531 }
532
533 void pa_stream_disconnect(pa_stream *s) {
534 pa_tagstruct *t;
535 uint32_t tag;
536 assert(s && s->ref >= 1);
537
538 if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY)
539 return;
540
541 pa_stream_ref(s);
542
543 t = pa_tagstruct_new(NULL, 0);
544 assert(t);
545
546 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
547 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
548 pa_tagstruct_putu32(t, tag = s->context->ctag++);
549 pa_tagstruct_putu32(t, s->channel);
550 pa_pstream_send_tagstruct(s->context->pstream, t);
551 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s);
552
553 pa_stream_unref(s);
554 }
555
556 void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
557 assert(s && s->ref >= 1);
558 s->read_callback = cb;
559 s->read_userdata = userdata;
560 }
561
562 void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
563 assert(s && s->ref >= 1);
564 s->write_callback = cb;
565 s->write_userdata = userdata;
566 }
567
568 void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) {
569 assert(s && s->ref >= 1);
570 s->state_callback = cb;
571 s->state_userdata = userdata;
572 }
573
574 void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
575 pa_operation *o = userdata;
576 int success = 1;
577 assert(pd && o && o->context && o->ref >= 1);
578
579 if (command != PA_COMMAND_REPLY) {
580 if (pa_context_handle_error(o->context, command, t) < 0)
581 goto finish;
582
583 success = 0;
584 } else if (!pa_tagstruct_eof(t)) {
585 pa_context_fail(o->context, PA_ERROR_PROTOCOL);
586 goto finish;
587 }
588
589 if (o->callback) {
590 void (*cb)(pa_stream *s, int _success, void *_userdata) = (void (*)(pa_stream *s, int _success, void *_userdata)) o->callback;
591 cb(o->stream, success, o->userdata);
592 }
593
594 finish:
595 pa_operation_done(o);
596 pa_operation_unref(o);
597 }
598
599 pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
600 pa_operation *o;
601 pa_tagstruct *t;
602 uint32_t tag;
603 assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
604
605 if (s->interpolate) {
606 if (!s->corked && b)
607 /* Pausing */
608 s->ipol_usec = pa_stream_get_interpolated_time(s);
609 else if (s->corked && !b)
610 /* Unpausing */
611 pa_gettimeofday(&s->ipol_timestamp);
612 }
613
614 s->corked = b;
615
616 o = pa_operation_new(s->context, s);
617 assert(o);
618 o->callback = (pa_operation_callback) cb;
619 o->userdata = userdata;
620
621 t = pa_tagstruct_new(NULL, 0);
622 assert(t);
623 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM);
624 pa_tagstruct_putu32(t, tag = s->context->ctag++);
625 pa_tagstruct_putu32(t, s->channel);
626 pa_tagstruct_put_boolean(t, !!b);
627 pa_pstream_send_tagstruct(s->context->pstream, t);
628 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
629
630 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
631
632 return pa_operation_ref(o);
633 }
634
635 static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata) {
636 pa_tagstruct *t;
637 pa_operation *o;
638 uint32_t tag;
639 assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
640
641 o = pa_operation_new(s->context, s);
642 o->callback = (pa_operation_callback) cb;
643 o->userdata = userdata;
644
645 t = pa_tagstruct_new(NULL, 0);
646 pa_tagstruct_putu32(t, command);
647 pa_tagstruct_putu32(t, tag = s->context->ctag++);
648 pa_tagstruct_putu32(t, s->channel);
649 pa_pstream_send_tagstruct(s->context->pstream, t);
650 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
651
652 return pa_operation_ref(o);
653 }
654
655 pa_operation* pa_stream_flush(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata) {
656 pa_operation *o;
657 o = stream_send_simple_command(s, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM, cb, userdata);
658 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
659 return o;
660 }
661
662 pa_operation* pa_stream_prebuf(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata) {
663 pa_operation *o;
664 o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata);
665 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
666 return o;
667 }
668
669 pa_operation* pa_stream_trigger(pa_stream *s, void (*cb)(pa_stream *s, int success, void *userdata), void *userdata) {
670 pa_operation *o;
671 o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata);
672 pa_operation_unref(pa_stream_get_latency_info(s, NULL, NULL));
673 return o;
674 }
675
676 pa_operation* pa_stream_set_name(pa_stream *s, const char *name, void(*cb)(pa_stream*c, int success, void *userdata), void *userdata) {
677 pa_operation *o;
678 pa_tagstruct *t;
679 uint32_t tag;
680 assert(s && s->ref >= 1 && s->state == PA_STREAM_READY && name && s->direction != PA_STREAM_UPLOAD);
681
682 o = pa_operation_new(s->context, s);
683 assert(o);
684 o->callback = (pa_operation_callback) cb;
685 o->userdata = userdata;
686
687 t = pa_tagstruct_new(NULL, 0);
688 assert(t);
689 pa_tagstruct_putu32(t, s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME);
690 pa_tagstruct_putu32(t, tag = s->context->ctag++);
691 pa_tagstruct_putu32(t, s->channel);
692 pa_tagstruct_puts(t, name);
693 pa_pstream_send_tagstruct(s->context->pstream, t);
694 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
695
696 return pa_operation_ref(o);
697 }
698
699 uint64_t pa_stream_get_counter(pa_stream *s) {
700 assert(s);
701 return s->counter;
702 }
703
704 pa_usec_t pa_stream_get_time(pa_stream *s, const pa_latency_info *i) {
705 pa_usec_t usec;
706 assert(s);
707
708 usec = pa_bytes_to_usec(i->counter, &s->sample_spec);
709
710 if (i) {
711 if (s->direction == PA_STREAM_PLAYBACK) {
712 pa_usec_t latency = i->transport_usec + i->buffer_usec + i->sink_usec;
713 if (usec < latency)
714 usec = 0;
715 else
716 usec -= latency;
717
718 } else if (s->direction == PA_STREAM_RECORD) {
719 usec += i->source_usec + i->buffer_usec + i->transport_usec;
720
721 if (usec > i->sink_usec)
722 usec -= i->sink_usec;
723 else
724 usec = 0;
725 }
726 }
727
728 if (usec < s->previous_time)
729 usec = s->previous_time;
730
731 s->previous_time = usec;
732
733 return usec;
734 }
735
736 static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t t, pa_usec_t c, int *negative) {
737 assert(s);
738
739 if (negative)
740 *negative = 0;
741
742 if (c < t) {
743 if (s->direction == PA_STREAM_RECORD) {
744 if (negative)
745 *negative = 1;
746
747 return t-c;
748 } else
749 return 0;
750 } else
751 return c-t;
752 }
753
754 pa_usec_t pa_stream_get_latency(pa_stream *s, const pa_latency_info *i, int *negative) {
755 pa_usec_t t, c;
756 assert(s && i);
757
758 t = pa_stream_get_time(s, i);
759 c = pa_bytes_to_usec(s->counter, &s->sample_spec);
760
761 return time_counter_diff(s, t, c, negative);
762 }
763
764 const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
765 assert(s);
766 return &s->sample_spec;
767 }
768
769 void pa_stream_trash_ipol(pa_stream *s) {
770 assert(s);
771
772 if (!s->interpolate)
773 return;
774
775 memset(&s->ipol_timestamp, 0, sizeof(s->ipol_timestamp));
776 s->ipol_usec = 0;
777 }
778
779 pa_usec_t pa_stream_get_interpolated_time(pa_stream *s) {
780 pa_usec_t usec;
781 assert(s && s->interpolate);
782
783 if (s->corked)
784 usec = s->ipol_usec;
785 else {
786 if (s->ipol_timestamp.tv_sec == 0)
787 usec = 0;
788 else
789 usec = s->ipol_usec + pa_timeval_age(&s->ipol_timestamp);
790 }
791
792 if (usec < s->previous_ipol_time)
793 usec = s->previous_ipol_time;
794
795 s->previous_ipol_time = usec;
796
797 return usec;
798 }
799
800 pa_usec_t pa_stream_get_interpolated_latency(pa_stream *s, int *negative) {
801 pa_usec_t t, c;
802 assert(s && s->interpolate);
803
804 t = pa_stream_get_interpolated_time(s);
805 c = pa_bytes_to_usec(s->counter, &s->sample_spec);
806 return time_counter_diff(s, t, c, negative);
807 }