]> code.delx.au - pulseaudio/blob - polyp/polyplib.c
introduce pa_xmalloc() and friends
[pulseaudio] / polyp / polyplib.c
1 /* $Id$ */
2
3 /***
4 This file is part of polypaudio.
5
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
10
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <assert.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <netdb.h>
33
34 #include "polyplib.h"
35 #include "native-common.h"
36 #include "pdispatch.h"
37 #include "pstream.h"
38 #include "dynarray.h"
39 #include "socket-client.h"
40 #include "pstream-util.h"
41 #include "authkey.h"
42 #include "util.h"
43 #include "xmalloc.h"
44
45 #define DEFAULT_MAXLENGTH 204800
46 #define DEFAULT_TLENGTH 10240
47 #define DEFAULT_PREBUF 4096
48 #define DEFAULT_MINREQ 1024
49 #define DEFAULT_FRAGSIZE 1024
50
51 #define DEFAULT_TIMEOUT (5*60)
52 #define DEFAULT_SERVER "/tmp/polypaudio/native"
53 #define DEFAULT_PORT "4713"
54
55 struct pa_context {
56 char *name;
57 struct pa_mainloop_api* mainloop;
58 struct pa_socket_client *client;
59 struct pa_pstream *pstream;
60 struct pa_pdispatch *pdispatch;
61 struct pa_dynarray *record_streams, *playback_streams;
62 struct pa_stream *first_stream;
63 uint32_t ctag;
64 uint32_t error;
65 enum {
66 CONTEXT_UNCONNECTED,
67 CONTEXT_CONNECTING,
68 CONTEXT_AUTHORIZING,
69 CONTEXT_SETTING_NAME,
70 CONTEXT_READY,
71 CONTEXT_DEAD
72 } state;
73
74 void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
75 void *connect_complete_userdata;
76
77 void (*drain_complete_callback)(struct pa_context*c, void *userdata);
78 void *drain_complete_userdata;
79
80 void (*die_callback)(struct pa_context*c, void *userdata);
81 void *die_userdata;
82
83 void (*stat_callback)(struct pa_context*c, uint32_t count, uint32_t total, void *userdata);
84 void *stat_userdata;
85
86 void (*play_sample_callback)(struct pa_context*c, int success, void *userdata);
87 void *play_sample_userdata;
88
89 void (*remove_sample_callback)(struct pa_context*c, int success, void *userdata);
90 void *remove_sample_userdata;
91
92 uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
93 };
94
95 struct pa_stream {
96 struct pa_context *context;
97 struct pa_stream *next, *previous;
98
99 char *name;
100 struct pa_buffer_attr buffer_attr;
101 struct pa_sample_spec sample_spec;
102 uint32_t channel;
103 int channel_valid;
104 uint32_t device_index;
105 enum pa_stream_direction direction;
106
107 enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
108 uint32_t requested_bytes;
109
110 void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
111 void *read_userdata;
112
113 void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
114 void *write_userdata;
115
116 void (*create_complete_callback)(struct pa_stream *s, int success, void *userdata);
117 void *create_complete_userdata;
118
119 void (*drain_complete_callback)(struct pa_stream *s, void *userdata);
120 void *drain_complete_userdata;
121
122 void (*die_callback)(struct pa_stream*c, void *userdata);
123 void *die_userdata;
124
125 void (*get_latency_callback)(struct pa_stream*c, uint32_t latency, void *userdata);
126 void *get_latency_userdata;
127
128 void (*finish_sample_callback)(struct pa_stream*c, int success, void *userdata);
129 void *finish_sample_userdata;
130 };
131
132 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
133 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
134
135 static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
136 [PA_COMMAND_ERROR] = { NULL },
137 [PA_COMMAND_REPLY] = { NULL },
138 [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
139 [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
140 [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
141 [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
142 [PA_COMMAND_EXIT] = { NULL },
143 [PA_COMMAND_REQUEST] = { command_request },
144 [PA_COMMAND_PLAYBACK_STREAM_KILLED] = { command_playback_stream_killed },
145 [PA_COMMAND_RECORD_STREAM_KILLED] = { command_playback_stream_killed },
146 };
147
148 struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
149 struct pa_context *c;
150 assert(mainloop && name);
151
152 c = pa_xmalloc(sizeof(struct pa_context));
153 c->name = pa_xstrdup(name);
154 c->mainloop = mainloop;
155 c->client = NULL;
156 c->pstream = NULL;
157 c->pdispatch = NULL;
158 c->playback_streams = pa_dynarray_new();
159 assert(c->playback_streams);
160 c->record_streams = pa_dynarray_new();
161 assert(c->record_streams);
162 c->first_stream = NULL;
163 c->error = PA_ERROR_OK;
164 c->state = CONTEXT_UNCONNECTED;
165 c->ctag = 0;
166
167 c->connect_complete_callback = NULL;
168 c->connect_complete_userdata = NULL;
169
170 c->drain_complete_callback = NULL;
171 c->drain_complete_userdata = NULL;
172
173 c->die_callback = NULL;
174 c->die_userdata = NULL;
175
176 c->stat_callback = NULL;
177 c->stat_userdata = NULL;
178
179 c->play_sample_callback = NULL;
180 c->play_sample_userdata = NULL;
181
182 c->remove_sample_callback = NULL;
183 c->remove_sample_userdata = NULL;
184
185 pa_check_for_sigpipe();
186 return c;
187 }
188
189 void pa_context_free(struct pa_context *c) {
190 assert(c);
191
192 while (c->first_stream)
193 pa_stream_free(c->first_stream);
194
195 if (c->client)
196 pa_socket_client_free(c->client);
197 if (c->pdispatch)
198 pa_pdispatch_free(c->pdispatch);
199 if (c->pstream)
200 pa_pstream_free(c->pstream);
201 if (c->record_streams)
202 pa_dynarray_free(c->record_streams, NULL, NULL);
203 if (c->playback_streams)
204 pa_dynarray_free(c->playback_streams, NULL, NULL);
205
206 pa_xfree(c->name);
207 pa_xfree(c);
208 }
209
210 static void stream_dead(struct pa_stream *s) {
211 assert(s);
212
213 if (s->state == STREAM_DEAD)
214 return;
215
216 if (s->state == STREAM_READY) {
217 s->state = STREAM_DEAD;
218 if (s->die_callback)
219 s->die_callback(s, s->die_userdata);
220 } else
221 s->state = STREAM_DEAD;
222 }
223
224 static void context_dead(struct pa_context *c) {
225 struct pa_stream *s;
226 assert(c);
227
228 if (c->state == CONTEXT_DEAD)
229 return;
230
231 if (c->pdispatch)
232 pa_pdispatch_free(c->pdispatch);
233 c->pdispatch = NULL;
234
235 if (c->pstream)
236 pa_pstream_free(c->pstream);
237 c->pstream = NULL;
238
239 if (c->client)
240 pa_socket_client_free(c->client);
241 c->client = NULL;
242
243 for (s = c->first_stream; s; s = s->next)
244 stream_dead(s);
245
246 if (c->state == CONTEXT_READY) {
247 c->state = CONTEXT_DEAD;
248 if (c->die_callback)
249 c->die_callback(c, c->die_userdata);
250 } else
251 c->state = CONTEXT_DEAD;
252 }
253
254 static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
255 struct pa_context *c = userdata;
256 assert(p && c);
257 c->error = PA_ERROR_CONNECTIONTERMINATED;
258 context_dead(c);
259 }
260
261 static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
262 struct pa_context *c = userdata;
263 assert(p && packet && c);
264
265 if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
266 fprintf(stderr, "polyp.c: invalid packet.\n");
267 c->error = PA_ERROR_PROTOCOL;
268 context_dead(c);
269 }
270 }
271
272 static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
273 struct pa_context *c = userdata;
274 struct pa_stream *s;
275 assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
276
277 if (!(s = pa_dynarray_get(c->record_streams, channel)))
278 return;
279
280 if (s->read_callback)
281 s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
282 }
283
284 static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
285 assert(c && t);
286
287 if (command == PA_COMMAND_ERROR) {
288 if (pa_tagstruct_getu32(t, &c->error) < 0) {
289 c->error = PA_ERROR_PROTOCOL;
290 return -1;
291 }
292
293 return 0;
294 }
295
296 c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
297 return -1;
298 }
299
300 static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
301 struct pa_context *c = userdata;
302 assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
303
304 if (command != PA_COMMAND_REPLY) {
305 handle_error(c, command, t);
306 context_dead(c);
307
308 if (c->connect_complete_callback)
309 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
310
311 return;
312 }
313
314 if (c->state == CONTEXT_AUTHORIZING) {
315 struct pa_tagstruct *t;
316 c->state = CONTEXT_SETTING_NAME;
317 t = pa_tagstruct_new(NULL, 0);
318 assert(t);
319 pa_tagstruct_putu32(t, PA_COMMAND_SET_NAME);
320 pa_tagstruct_putu32(t, tag = c->ctag++);
321 pa_tagstruct_puts(t, c->name);
322 pa_pstream_send_tagstruct(c->pstream, t);
323 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
324 } else {
325 assert(c->state == CONTEXT_SETTING_NAME);
326
327 c->state = CONTEXT_READY;
328
329 if (c->connect_complete_callback)
330 c->connect_complete_callback(c, 1, c->connect_complete_userdata);
331 }
332
333 return;
334 }
335
336 static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
337 struct pa_context *c = userdata;
338 struct pa_tagstruct *t;
339 uint32_t tag;
340 assert(client && c && c->state == CONTEXT_CONNECTING);
341
342 pa_socket_client_free(client);
343 c->client = NULL;
344
345 if (!io) {
346 c->error = PA_ERROR_CONNECTIONREFUSED;
347 context_dead(c);
348
349 if (c->connect_complete_callback)
350 c->connect_complete_callback(c, 0, c->connect_complete_userdata);
351
352 return;
353 }
354
355 c->pstream = pa_pstream_new(c->mainloop, io);
356 assert(c->pstream);
357 pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
358 pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
359 pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
360
361 c->pdispatch = pa_pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
362 assert(c->pdispatch);
363
364 t = pa_tagstruct_new(NULL, 0);
365 assert(t);
366 pa_tagstruct_putu32(t, PA_COMMAND_AUTH);
367 pa_tagstruct_putu32(t, tag = c->ctag++);
368 pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
369 pa_pstream_send_tagstruct(c->pstream, t);
370 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
371 c->state = CONTEXT_AUTHORIZING;
372 }
373
374 static struct sockaddr *resolve_server(const char *server, size_t *len) {
375 struct sockaddr *sa;
376 struct addrinfo hints, *result = NULL;
377 char *port;
378 assert(server && len);
379
380 if ((port = strrchr(server, ':')))
381 port++;
382 if (!port)
383 port = DEFAULT_PORT;
384
385 memset(&hints, 0, sizeof(hints));
386 hints.ai_family = PF_UNSPEC;
387 hints.ai_socktype = SOCK_STREAM;
388 hints.ai_protocol = 0;
389
390 if (getaddrinfo(server, port, &hints, &result) != 0)
391 return NULL;
392 assert(result);
393
394 sa = pa_xmalloc(*len = result->ai_addrlen);
395 memcpy(sa, result->ai_addr, *len);
396
397 freeaddrinfo(result);
398
399 return sa;
400 }
401
402 int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
403 assert(c && c->state == CONTEXT_UNCONNECTED);
404
405 if (pa_authkey_load_from_home(PA_NATIVE_COOKIE_FILE, c->auth_cookie, sizeof(c->auth_cookie)) < 0) {
406 c->error = PA_ERROR_AUTHKEY;
407 return -1;
408 }
409
410 if (!server)
411 if (!(server = getenv("POLYP_SERVER")))
412 server = DEFAULT_SERVER;
413
414 assert(!c->client);
415
416 if (*server == '/') {
417 if (!(c->client = pa_socket_client_new_unix(c->mainloop, server))) {
418 c->error = PA_ERROR_CONNECTIONREFUSED;
419 return -1;
420 }
421 } else {
422 struct sockaddr* sa;
423 size_t sa_len;
424
425 if (!(sa = resolve_server(server, &sa_len))) {
426 c->error = PA_ERROR_INVALIDSERVER;
427 return -1;
428 }
429
430 c->client = pa_socket_client_new_sockaddr(c->mainloop, sa, sa_len);
431 pa_xfree(sa);
432
433 if (!c->client) {
434 c->error = PA_ERROR_CONNECTIONREFUSED;
435 return -1;
436 }
437 }
438
439 c->connect_complete_callback = complete;
440 c->connect_complete_userdata = userdata;
441
442 pa_socket_client_set_callback(c->client, on_connection, c);
443 c->state = CONTEXT_CONNECTING;
444
445 return 0;
446 }
447
448 int pa_context_is_dead(struct pa_context *c) {
449 assert(c);
450 return c->state == CONTEXT_DEAD;
451 }
452
453 int pa_context_is_ready(struct pa_context *c) {
454 assert(c);
455 return c->state == CONTEXT_READY;
456 }
457
458 int pa_context_errno(struct pa_context *c) {
459 assert(c);
460 return c->error;
461 }
462
463 void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
464 assert(c);
465 c->die_callback = cb;
466 c->die_userdata = userdata;
467 }
468
469 static void command_playback_stream_killed(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
470 struct pa_context *c = userdata;
471 struct pa_stream *s;
472 uint32_t channel;
473 assert(pd && (command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED) && t && c);
474
475 if (pa_tagstruct_getu32(t, &channel) < 0 ||
476 !pa_tagstruct_eof(t)) {
477 c->error = PA_ERROR_PROTOCOL;
478 context_dead(c);
479 return;
480 }
481
482 if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
483 return;
484
485 c->error = PA_ERROR_KILLED;
486 stream_dead(s);
487 }
488
489 static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
490 struct pa_stream *s;
491 struct pa_context *c = userdata;
492 uint32_t bytes, channel;
493 assert(pd && command == PA_COMMAND_REQUEST && t && c);
494
495 if (pa_tagstruct_getu32(t, &channel) < 0 ||
496 pa_tagstruct_getu32(t, &bytes) < 0 ||
497 !pa_tagstruct_eof(t)) {
498 c->error = PA_ERROR_PROTOCOL;
499 context_dead(c);
500 return;
501 }
502
503 if (!(s = pa_dynarray_get(c->playback_streams, channel)))
504 return;
505
506 if (s->state != STREAM_READY)
507 return;
508
509 s->requested_bytes += bytes;
510
511 if (s->requested_bytes && s->write_callback)
512 s->write_callback(s, s->requested_bytes, s->write_userdata);
513 }
514
515 static void create_stream_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
516 struct pa_stream *s = userdata;
517 assert(pd && s && s->state == STREAM_CREATING);
518
519 if (command != PA_COMMAND_REPLY) {
520 if (handle_error(s->context, command, t) < 0) {
521 context_dead(s->context);
522 return;
523 }
524
525 stream_dead(s);
526 if (s->create_complete_callback)
527 s->create_complete_callback(s, 0, s->create_complete_userdata);
528
529 return;
530 }
531
532 if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
533 ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
534 !pa_tagstruct_eof(t)) {
535 s->context->error = PA_ERROR_PROTOCOL;
536 context_dead(s->context);
537 return;
538 }
539
540 s->channel_valid = 1;
541 pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
542
543 s->state = STREAM_READY;
544 if (s->create_complete_callback)
545 s->create_complete_callback(s, 1, s->create_complete_userdata);
546 }
547
548 static void create_stream(struct pa_stream *s, const char *dev) {
549 struct pa_tagstruct *t;
550 uint32_t tag;
551 assert(s);
552
553 s->state = STREAM_CREATING;
554
555 t = pa_tagstruct_new(NULL, 0);
556 assert(t);
557
558 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
559 pa_tagstruct_putu32(t, tag = s->context->ctag++);
560 pa_tagstruct_puts(t, s->name);
561 pa_tagstruct_put_sample_spec(t, &s->sample_spec);
562 pa_tagstruct_putu32(t, (uint32_t) -1);
563 pa_tagstruct_puts(t, dev ? dev : "");
564 pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
565 if (s->direction == PA_STREAM_PLAYBACK) {
566 pa_tagstruct_putu32(t, s->buffer_attr.tlength);
567 pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
568 pa_tagstruct_putu32(t, s->buffer_attr.minreq);
569 } else
570 pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
571
572 pa_pstream_send_tagstruct(s->context->pstream, t);
573 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, s);
574 }
575
576 static struct pa_stream *internal_stream_new(struct pa_context *c) {
577 struct pa_stream *s;
578
579 s = pa_xmalloc(sizeof(struct pa_stream));
580 s->context = c;
581
582 s->read_callback = NULL;
583 s->read_userdata = NULL;
584 s->write_callback = NULL;
585 s->write_userdata = NULL;
586 s->die_callback = NULL;
587 s->die_userdata = NULL;
588 s->create_complete_callback = NULL;
589 s->create_complete_userdata = NULL;
590 s->get_latency_callback = NULL;
591 s->get_latency_userdata = NULL;
592 s->finish_sample_callback = NULL;
593 s->finish_sample_userdata = NULL;
594
595 s->name = NULL;
596 s->state = STREAM_CREATING;
597 s->requested_bytes = 0;
598 s->channel = 0;
599 s->channel_valid = 0;
600 s->device_index = (uint32_t) -1;
601
602 memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
603
604 s->next = c->first_stream;
605 if (s->next)
606 s->next->previous = s;
607 s->previous = NULL;
608 c->first_stream = s;
609
610 return s;
611 }
612
613 struct pa_stream* pa_stream_new(
614 struct pa_context *c,
615 enum pa_stream_direction dir,
616 const char *dev,
617 const char *name,
618 const struct pa_sample_spec *ss,
619 const struct pa_buffer_attr *attr,
620 void (*complete) (struct pa_stream*s, int success, void *userdata),
621 void *userdata) {
622
623 struct pa_stream *s;
624
625 assert(c && name && ss && c->state == CONTEXT_READY && (dir == PA_STREAM_PLAYBACK || dir == PA_STREAM_RECORD));
626
627 s = internal_stream_new(c);
628 assert(s);
629
630 s->create_complete_callback = complete;
631 s->create_complete_userdata = userdata;
632 s->name = pa_xstrdup(name);
633 s->state = STREAM_CREATING;
634 s->direction = dir;
635 s->sample_spec = *ss;
636 if (attr)
637 s->buffer_attr = *attr;
638 else {
639 s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
640 s->buffer_attr.tlength = DEFAULT_TLENGTH;
641 s->buffer_attr.prebuf = DEFAULT_PREBUF;
642 s->buffer_attr.minreq = DEFAULT_MINREQ;
643 s->buffer_attr.fragsize = DEFAULT_FRAGSIZE;
644 }
645
646 create_stream(s, dev);
647
648 return s;
649 }
650
651 void pa_stream_free(struct pa_stream *s) {
652 assert(s && s->context);
653
654 if (s->context->pdispatch)
655 pa_pdispatch_unregister_reply(s->context->pdispatch, s);
656
657 pa_xfree(s->name);
658
659 if (s->channel_valid && s->context->state == CONTEXT_READY) {
660 struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
661 assert(t);
662
663 pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
664 (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
665 pa_tagstruct_putu32(t, s->context->ctag++);
666 pa_tagstruct_putu32(t, s->channel);
667 pa_pstream_send_tagstruct(s->context->pstream, t);
668 }
669
670 if (s->channel_valid)
671 pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
672
673 if (s->next)
674 s->next->previous = s->previous;
675 if (s->previous)
676 s->previous->next = s->next;
677 else
678 s->context->first_stream = s->next;
679
680 pa_xfree(s);
681 }
682
683 void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
684 s->write_callback = cb;
685 s->write_userdata = userdata;
686 }
687
688 void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
689 struct pa_memchunk chunk;
690 assert(s && s->context && data && length && s->state == STREAM_READY);
691
692 chunk.memblock = pa_memblock_new(length);
693 assert(chunk.memblock && chunk.memblock->data);
694 memcpy(chunk.memblock->data, data, length);
695 chunk.index = 0;
696 chunk.length = length;
697
698 pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
699 pa_memblock_unref(chunk.memblock);
700
701 /*fprintf(stderr, "Sent %u bytes\n", length);*/
702
703 if (length < s->requested_bytes)
704 s->requested_bytes -= length;
705 else
706 s->requested_bytes = 0;
707 }
708
709 size_t pa_stream_writable_size(struct pa_stream *s) {
710 assert(s && s->state == STREAM_READY);
711 return s->requested_bytes;
712 }
713
714 void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
715 assert(s && cb);
716 s->read_callback = cb;
717 s->read_userdata = userdata;
718 }
719
720 int pa_stream_is_dead(struct pa_stream *s) {
721 return s->state == STREAM_DEAD;
722 }
723
724 int pa_stream_is_ready(struct pa_stream*s) {
725 return s->state == STREAM_READY;
726 }
727
728 void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
729 assert(s);
730 s->die_callback = cb;
731 s->die_userdata = userdata;
732 }
733
734 int pa_context_is_pending(struct pa_context *c) {
735 assert(c);
736
737 if (c->state != CONTEXT_READY)
738 return 0;
739
740 return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
741 }
742
743 struct pa_context* pa_stream_get_context(struct pa_stream *p) {
744 assert(p);
745 return p->context;
746 }
747
748 static void set_dispatch_callbacks(struct pa_context *c);
749
750 static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
751 set_dispatch_callbacks(userdata);
752 }
753
754 static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
755 set_dispatch_callbacks(userdata);
756 }
757
758 static void set_dispatch_callbacks(struct pa_context *c) {
759 assert(c && c->state == CONTEXT_READY);
760
761 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
762 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
763
764 if (pa_pdispatch_is_pending(c->pdispatch)) {
765 pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
766 return;
767 }
768
769 if (pa_pstream_is_pending(c->pstream)) {
770 pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
771 return;
772 }
773
774 assert(c->drain_complete_callback);
775 c->drain_complete_callback(c, c->drain_complete_userdata);
776 }
777
778 int pa_context_drain(
779 struct pa_context *c,
780 void (*complete) (struct pa_context*c, void *userdata),
781 void *userdata) {
782
783 assert(c && c->state == CONTEXT_READY);
784
785 if (complete == NULL) {
786 c->drain_complete_callback = NULL;
787 pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
788 pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
789 return 0;
790 }
791
792 if (!pa_context_is_pending(c))
793 return -1;
794
795 c->drain_complete_callback = complete;
796 c->drain_complete_userdata = userdata;
797
798 set_dispatch_callbacks(c);
799
800 return 0;
801 }
802
803 static void stream_drain_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
804 struct pa_stream *s = userdata;
805 assert(pd && s);
806
807 if (command != PA_COMMAND_REPLY) {
808 if (handle_error(s->context, command, t) < 0) {
809 context_dead(s->context);
810 return;
811 }
812
813 stream_dead(s);
814 return;
815 }
816
817 if (s->state != STREAM_READY)
818 return;
819
820 if (!pa_tagstruct_eof(t)) {
821 s->context->error = PA_ERROR_PROTOCOL;
822 context_dead(s->context);
823 return;
824 }
825
826 if (s->drain_complete_callback) {
827 void (*temp) (struct pa_stream*s, void *userdata) = s->drain_complete_callback;
828 s->drain_complete_callback = NULL;
829 temp(s, s->drain_complete_userdata);
830 }
831 }
832
833
834 void pa_stream_drain(struct pa_stream *s, void (*complete) (struct pa_stream*s, void *userdata), void *userdata) {
835 struct pa_tagstruct *t;
836 uint32_t tag;
837 assert(s && s->state == STREAM_READY);
838
839 if (!complete) {
840 s->drain_complete_callback = NULL;
841 return;
842 }
843
844 s->drain_complete_callback = complete;
845 s->drain_complete_userdata = userdata;
846
847 t = pa_tagstruct_new(NULL, 0);
848 assert(t);
849
850 pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
851 pa_tagstruct_putu32(t, tag = s->context->ctag++);
852 pa_tagstruct_putu32(t, s->channel);
853 pa_pstream_send_tagstruct(s->context->pstream, t);
854 pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_drain_callback, s);
855 }
856
857 void pa_context_exit(struct pa_context *c) {
858 struct pa_tagstruct *t;
859 t = pa_tagstruct_new(NULL, 0);
860 assert(t);
861 pa_tagstruct_putu32(t, PA_COMMAND_EXIT);
862 pa_tagstruct_putu32(t, c->ctag++);
863 pa_pstream_send_tagstruct(c->pstream, t);
864 }
865
866 static void context_stat_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
867 struct pa_context *c = userdata;
868 uint32_t total, count;
869 assert(pd && c);
870
871 if (command != PA_COMMAND_REPLY) {
872 if (handle_error(c, command, t) < 0) {
873 context_dead(c);
874 return;
875 }
876
877 if (c->stat_callback)
878 c->stat_callback(c, (uint32_t) -1, (uint32_t) -1, c->stat_userdata);
879 return;
880 }
881
882 if (pa_tagstruct_getu32(t, &count) < 0 ||
883 pa_tagstruct_getu32(t, &total) < 0 ||
884 !pa_tagstruct_eof(t)) {
885 c->error = PA_ERROR_PROTOCOL;
886 context_dead(c);
887 return;
888 }
889
890 if (c->stat_callback)
891 c->stat_callback(c, count, total, c->stat_userdata);
892 }
893
894 void pa_context_stat(struct pa_context *c, void (*cb)(struct pa_context *c, uint32_t count, uint32_t total, void *userdata), void *userdata) {
895 uint32_t tag;
896 struct pa_tagstruct *t;
897
898 c->stat_callback = cb;
899 c->stat_userdata = userdata;
900
901 if (cb == NULL)
902 return;
903
904 t = pa_tagstruct_new(NULL, 0);
905 assert(t);
906 pa_tagstruct_putu32(t, PA_COMMAND_STAT);
907 pa_tagstruct_putu32(t, tag = c->ctag++);
908 pa_pstream_send_tagstruct(c->pstream, t);
909 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_stat_callback, c);
910 }
911
912 static void stream_get_latency_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
913 struct pa_stream *s = userdata;
914 uint32_t latency;
915 assert(pd && s);
916
917 if (command != PA_COMMAND_REPLY) {
918 if (handle_error(s->context, command, t) < 0) {
919 context_dead(s->context);
920 return;
921 }
922
923 if (s->get_latency_callback)
924 s->get_latency_callback(s, (uint32_t) -1, s->get_latency_userdata);
925 return;
926 }
927
928 if (pa_tagstruct_getu32(t, &latency) < 0 ||
929 !pa_tagstruct_eof(t)) {
930 s->context->error = PA_ERROR_PROTOCOL;
931 context_dead(s->context);
932 return;
933 }
934
935 if (s->get_latency_callback)
936 s->get_latency_callback(s, latency, s->get_latency_userdata);
937 }
938
939 void pa_stream_get_latency(struct pa_stream *p, void (*cb)(struct pa_stream *p, uint32_t latency, void *userdata), void *userdata) {
940 uint32_t tag;
941 struct pa_tagstruct *t;
942
943 p->get_latency_callback = cb;
944 p->get_latency_userdata = userdata;
945
946 if (cb == NULL)
947 return;
948
949 t = pa_tagstruct_new(NULL, 0);
950 assert(t);
951 pa_tagstruct_putu32(t, PA_COMMAND_GET_PLAYBACK_LATENCY);
952 pa_tagstruct_putu32(t, tag = p->context->ctag++);
953 pa_tagstruct_putu32(t, p->channel);
954 pa_pstream_send_tagstruct(p->context->pstream, t);
955 pa_pdispatch_register_reply(p->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_latency_callback, p);
956 }
957
958 struct pa_stream* pa_context_upload_sample(struct pa_context *c, const char *name, const struct pa_sample_spec *ss, size_t length, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) {
959 struct pa_stream *s;
960 struct pa_tagstruct *t;
961 uint32_t tag;
962
963 s = internal_stream_new(c);
964 assert(s);
965
966 s->create_complete_callback = cb;
967 s->create_complete_userdata = userdata;
968 s->name = pa_xstrdup(name);
969 s->state = STREAM_CREATING;
970 s->direction = PA_STREAM_UPLOAD;
971 s->sample_spec = *ss;
972
973 t = pa_tagstruct_new(NULL, 0);
974 assert(t);
975 pa_tagstruct_putu32(t, PA_COMMAND_CREATE_UPLOAD_STREAM);
976 pa_tagstruct_putu32(t, tag = c->ctag++);
977 pa_tagstruct_puts(t, name);
978 pa_tagstruct_put_sample_spec(t, ss);
979 pa_tagstruct_putu32(t, length);
980 pa_pstream_send_tagstruct(c->pstream, t);
981 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_stream_callback, s);
982
983 return s;
984 }
985
986 static void stream_finish_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
987 struct pa_stream *s = userdata;
988 assert(pd && s);
989
990 if (command != PA_COMMAND_REPLY) {
991 if (handle_error(s->context, command, t) < 0) {
992 context_dead(s->context);
993 return;
994 }
995
996 if (s->finish_sample_callback)
997 s->finish_sample_callback(s, 0, s->finish_sample_userdata);
998 return;
999 }
1000
1001 if (!pa_tagstruct_eof(t)) {
1002 s->context->error = PA_ERROR_PROTOCOL;
1003 context_dead(s->context);
1004 return;
1005 }
1006
1007 if (s->finish_sample_callback)
1008 s->finish_sample_callback(s, 1, s->finish_sample_userdata);
1009 }
1010
1011 void pa_stream_finish_sample(struct pa_stream *p, void (*cb)(struct pa_stream*s, int success, void *userdata), void *userdata) {
1012 struct pa_tagstruct *t;
1013 uint32_t tag;
1014 assert(p);
1015
1016 p->finish_sample_callback = cb;
1017 p->finish_sample_userdata = userdata;
1018
1019 t = pa_tagstruct_new(NULL, 0);
1020 assert(t);
1021 pa_tagstruct_putu32(t, PA_COMMAND_FINISH_UPLOAD_STREAM);
1022 pa_tagstruct_putu32(t, tag = p->context->ctag++);
1023 pa_tagstruct_putu32(t, p->channel);
1024 pa_pstream_send_tagstruct(p->context->pstream, t);
1025 pa_pdispatch_register_reply(p->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_finish_sample_callback, p);
1026 }
1027
1028 static void context_play_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1029 struct pa_context *c = userdata;
1030 assert(pd && c);
1031
1032 if (command != PA_COMMAND_REPLY) {
1033 if (handle_error(c, command, t) < 0) {
1034 context_dead(c);
1035 return;
1036 }
1037
1038 if (c->play_sample_callback)
1039 c->play_sample_callback(c, 0, c->play_sample_userdata);
1040 return;
1041 }
1042
1043 if (!pa_tagstruct_eof(t)) {
1044 c->error = PA_ERROR_PROTOCOL;
1045 context_dead(c);
1046 return;
1047 }
1048
1049 if (c->play_sample_callback)
1050 c->play_sample_callback(c, 1, c->play_sample_userdata);
1051 }
1052
1053 void pa_context_play_sample(struct pa_context *c, const char *name, const char *dev, uint32_t volume, void (*cb)(struct pa_context *c, int success, void *userdata), void *userdata) {
1054 struct pa_tagstruct *t;
1055 uint32_t tag;
1056 assert(c && name && *name && (!dev || *dev));
1057
1058 if (!volume)
1059 return;
1060
1061 c->play_sample_callback = cb;
1062 c->play_sample_userdata = userdata;
1063
1064 t = pa_tagstruct_new(NULL, 0);
1065 assert(t);
1066 pa_tagstruct_putu32(t, PA_COMMAND_PLAY_SAMPLE);
1067 pa_tagstruct_putu32(t, tag = c->ctag++);
1068 pa_tagstruct_putu32(t, (uint32_t) -1);
1069 pa_tagstruct_puts(t, dev ? dev : "");
1070 pa_tagstruct_putu32(t, volume);
1071 pa_tagstruct_puts(t, name);
1072 pa_pstream_send_tagstruct(c->pstream, t);
1073 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_play_sample_callback, c);
1074 }
1075
1076 static void context_remove_sample_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
1077 struct pa_context *c = userdata;
1078 assert(pd && c);
1079
1080 if (command != PA_COMMAND_REPLY) {
1081 if (handle_error(c, command, t) < 0) {
1082 context_dead(c);
1083 return;
1084 }
1085
1086 if (c->remove_sample_callback)
1087 c->remove_sample_callback(c, 0, c->remove_sample_userdata);
1088 return;
1089 }
1090
1091 if (!pa_tagstruct_eof(t)) {
1092 c->error = PA_ERROR_PROTOCOL;
1093 context_dead(c);
1094 return;
1095 }
1096
1097 if (c->remove_sample_callback)
1098 c->remove_sample_callback(c, 1, c->remove_sample_userdata);
1099 }
1100
1101 void pa_context_remove_sample(struct pa_context *c, const char *name, void (*cb)(struct pa_context *c, int success, void *userdata), void *userdata) {
1102 struct pa_tagstruct *t;
1103 uint32_t tag;
1104 assert(c && name);
1105
1106 c->remove_sample_callback = cb;
1107 c->remove_sample_userdata = userdata;
1108
1109 t = pa_tagstruct_new(NULL, 0);
1110 assert(t);
1111 pa_tagstruct_putu32(t, PA_COMMAND_REMOVE_SAMPLE);
1112 pa_tagstruct_putu32(t, tag = c->ctag++);
1113 pa_tagstruct_puts(t, name);
1114 pa_pstream_send_tagstruct(c->pstream, t);
1115 pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, context_remove_sample_callback, c);
1116 }