]> code.delx.au - pulseaudio/blob - src/pulsecore/pstream.c
* add new --system command line parameter to the daemon for running PulseAudio as...
[pulseaudio] / src / pulsecore / pstream.c
1 /* $Id$ */
2
3 /***
4 This file is part of PulseAudio.
5
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
10
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <sys/socket.h>
31 #include <sys/un.h>
32
33 #ifdef HAVE_NETINET_IN_H
34 #include <netinet/in.h>
35 #endif
36
37 #include "winsock.h"
38
39 #include <pulse/xmalloc.h>
40
41 #include <pulsecore/queue.h>
42 #include <pulsecore/log.h>
43 #include <pulsecore/core-scache.h>
44
45 #include "pstream.h"
46
47 enum {
48 PA_PSTREAM_DESCRIPTOR_LENGTH,
49 PA_PSTREAM_DESCRIPTOR_CHANNEL,
50 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
51 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
52 PA_PSTREAM_DESCRIPTOR_SEEK,
53 PA_PSTREAM_DESCRIPTOR_MAX
54 };
55
56 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
57
58 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
59 #define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
60
61 struct item_info {
62 enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
63
64 /* memblock info */
65 pa_memchunk chunk;
66 uint32_t channel;
67 int64_t offset;
68 pa_seek_mode_t seek_mode;
69
70 /* packet info */
71 pa_packet *packet;
72 #ifdef SCM_CREDENTIALS
73 int with_creds;
74 struct ucred creds;
75 #endif
76 };
77
78 struct pa_pstream {
79 int ref;
80
81 pa_mainloop_api *mainloop;
82 pa_defer_event *defer_event;
83 pa_iochannel *io;
84 pa_queue *send_queue;
85
86 int dead;
87
88 struct {
89 struct item_info* current;
90 pa_pstream_descriptor descriptor;
91 void *data;
92 size_t index;
93 } write;
94
95 struct {
96 pa_memblock *memblock;
97 pa_packet *packet;
98 pa_pstream_descriptor descriptor;
99 void *data;
100 size_t index;
101 } read;
102
103 pa_pstream_packet_cb_t recieve_packet_callback;
104 void *recieve_packet_callback_userdata;
105
106 pa_pstream_memblock_cb_t recieve_memblock_callback;
107 void *recieve_memblock_callback_userdata;
108
109 pa_pstream_notify_cb_t drain_callback;
110 void *drain_callback_userdata;
111
112 pa_pstream_notify_cb_t die_callback;
113 void *die_callback_userdata;
114
115 pa_memblock_stat *memblock_stat;
116
117 #ifdef SCM_CREDENTIALS
118 struct ucred read_creds, write_creds;
119 int read_creds_valid, send_creds_now;
120 #endif
121 };
122
123 static int do_write(pa_pstream *p);
124 static int do_read(pa_pstream *p);
125
126 static void do_something(pa_pstream *p) {
127 assert(p);
128
129 p->mainloop->defer_enable(p->defer_event, 0);
130
131 pa_pstream_ref(p);
132
133 if (!p->dead && pa_iochannel_is_readable(p->io)) {
134 if (do_read(p) < 0)
135 goto fail;
136 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
137 goto fail;
138
139 if (!p->dead && pa_iochannel_is_writable(p->io)) {
140 if (do_write(p) < 0)
141 goto fail;
142 }
143
144 pa_pstream_unref(p);
145 return;
146
147 fail:
148
149 p->dead = 1;
150
151 if (p->die_callback)
152 p->die_callback(p, p->die_callback_userdata);
153
154 pa_pstream_unref(p);
155 }
156
157 static void io_callback(pa_iochannel*io, void *userdata) {
158 pa_pstream *p = userdata;
159
160 assert(p);
161 assert(p->io == io);
162
163 do_something(p);
164 }
165
166 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
167 pa_pstream *p = userdata;
168
169 assert(p);
170 assert(p->defer_event == e);
171 assert(p->mainloop == m);
172
173 do_something(p);
174 }
175
176 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
177 pa_pstream *p;
178 assert(io);
179
180 p = pa_xnew(pa_pstream, 1);
181
182 p->ref = 1;
183 p->io = io;
184 pa_iochannel_set_callback(io, io_callback, p);
185
186 p->dead = 0;
187
188 p->mainloop = m;
189 p->defer_event = m->defer_new(m, defer_callback, p);
190 m->defer_enable(p->defer_event, 0);
191
192 p->send_queue = pa_queue_new();
193 assert(p->send_queue);
194
195 p->write.current = NULL;
196 p->write.index = 0;
197
198 p->read.memblock = NULL;
199 p->read.packet = NULL;
200 p->read.index = 0;
201
202 p->recieve_packet_callback = NULL;
203 p->recieve_packet_callback_userdata = NULL;
204
205 p->recieve_memblock_callback = NULL;
206 p->recieve_memblock_callback_userdata = NULL;
207
208 p->drain_callback = NULL;
209 p->drain_callback_userdata = NULL;
210
211 p->die_callback = NULL;
212 p->die_callback_userdata = NULL;
213
214 p->memblock_stat = s;
215
216 pa_iochannel_socket_set_rcvbuf(io, 1024*8);
217 pa_iochannel_socket_set_sndbuf(io, 1024*8);
218
219 #ifdef SCM_CREDENTIALS
220 p->send_creds_now = 0;
221 p->read_creds_valid = 0;
222 #endif
223 return p;
224 }
225
226 static void item_free(void *item, PA_GCC_UNUSED void *p) {
227 struct item_info *i = item;
228 assert(i);
229
230 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
231 assert(i->chunk.memblock);
232 pa_memblock_unref(i->chunk.memblock);
233 } else {
234 assert(i->type == PA_PSTREAM_ITEM_PACKET);
235 assert(i->packet);
236 pa_packet_unref(i->packet);
237 }
238
239 pa_xfree(i);
240 }
241
242 static void pstream_free(pa_pstream *p) {
243 assert(p);
244
245 pa_pstream_close(p);
246
247 pa_queue_free(p->send_queue, item_free, NULL);
248
249 if (p->write.current)
250 item_free(p->write.current, NULL);
251
252 if (p->read.memblock)
253 pa_memblock_unref(p->read.memblock);
254
255 if (p->read.packet)
256 pa_packet_unref(p->read.packet);
257
258 pa_xfree(p);
259 }
260
261 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const struct ucred *creds) {
262 struct item_info *i;
263 assert(p && packet && p->ref >= 1);
264
265 if (p->dead)
266 return;
267
268 /* pa_log(__FILE__": push-packet %p", packet); */
269
270 i = pa_xnew(struct item_info, 1);
271 i->type = PA_PSTREAM_ITEM_PACKET;
272 i->packet = pa_packet_ref(packet);
273 #ifdef SCM_CREDENTIALS
274 if ((i->with_creds = !!creds))
275 i->creds = *creds;
276 #endif
277
278 pa_queue_push(p->send_queue, i);
279 p->mainloop->defer_enable(p->defer_event, 1);
280 }
281
282 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
283 struct item_info *i;
284 assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
285
286 if (p->dead)
287 return;
288
289 /* pa_log(__FILE__": push-memblock %p", chunk); */
290
291 i = pa_xnew(struct item_info, 1);
292 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
293 i->chunk = *chunk;
294 i->channel = channel;
295 i->offset = offset;
296 i->seek_mode = seek_mode;
297 #ifdef SCM_CREDENTIALS
298 i->with_creds = 0;
299 #endif
300
301 pa_memblock_ref(i->chunk.memblock);
302
303 pa_queue_push(p->send_queue, i);
304 p->mainloop->defer_enable(p->defer_event, 1);
305 }
306
307 static void prepare_next_write_item(pa_pstream *p) {
308 assert(p);
309
310 if (!(p->write.current = pa_queue_pop(p->send_queue)))
311 return;
312
313 p->write.index = 0;
314
315 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
316 /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
317
318 assert(p->write.current->packet);
319 p->write.data = p->write.current->packet->data;
320 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
321 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
322 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
323 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
324 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
325
326
327 } else {
328 assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
329 p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
330 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
331 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
332 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
333 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
334 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
335 }
336
337 #ifdef SCM_CREDENTIALS
338 if ((p->send_creds_now = p->write.current->with_creds))
339 p->write_creds = p->write.current->creds;
340
341 #endif
342
343 }
344
345 static int do_write(pa_pstream *p) {
346 void *d;
347 size_t l;
348 ssize_t r;
349 assert(p);
350
351 if (!p->write.current)
352 prepare_next_write_item(p);
353
354 if (!p->write.current)
355 return 0;
356
357 assert(p->write.data);
358
359 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
360 d = (uint8_t*) p->write.descriptor + p->write.index;
361 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
362 } else {
363 d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
364 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
365 }
366
367 #ifdef SCM_CREDENTIALS
368 if (p->send_creds_now) {
369
370 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
371 return -1;
372
373 p->send_creds_now = 0;
374 } else
375 #endif
376
377 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
378 return -1;
379
380 p->write.index += r;
381
382 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
383 assert(p->write.current);
384 item_free(p->write.current, (void *) 1);
385 p->write.current = NULL;
386
387 if (p->drain_callback && !pa_pstream_is_pending(p))
388 p->drain_callback(p, p->drain_callback_userdata);
389 }
390
391 return 0;
392 }
393
394 static int do_read(pa_pstream *p) {
395 void *d;
396 size_t l;
397 ssize_t r;
398 assert(p);
399
400 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
401 d = (uint8_t*) p->read.descriptor + p->read.index;
402 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
403 } else {
404 assert(p->read.data);
405 d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
406 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
407 }
408
409 #ifdef SCM_CREDENTIALS
410 {
411 int b = 0;
412
413 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
414 return -1;
415
416 p->read_creds_valid = p->read_creds_valid || b;
417 }
418 #else
419 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
420 return -1;
421 #endif
422
423 p->read.index += r;
424
425 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
426 /* Reading of frame descriptor complete */
427
428 /* Frame size too large */
429 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
430 pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
431 return -1;
432 }
433
434 assert(!p->read.packet && !p->read.memblock);
435
436 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
437 /* Frame is a packet frame */
438 p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
439 p->read.data = p->read.packet->data;
440 } else {
441 /* Frame is a memblock frame */
442 p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
443 p->read.data = p->read.memblock->data;
444
445 if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
446 pa_log_warn(__FILE__": Invalid seek mode");
447 return -1;
448 }
449 }
450
451 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
452 /* Frame payload available */
453
454 if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
455 l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
456
457 if (l > 0) {
458 pa_memchunk chunk;
459
460 chunk.memblock = p->read.memblock;
461 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
462 chunk.length = l;
463
464 if (p->recieve_memblock_callback) {
465 int64_t offset;
466
467 offset = (int64_t) (
468 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
469 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
470
471 p->recieve_memblock_callback(
472 p,
473 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
474 offset,
475 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
476 &chunk,
477 p->recieve_memblock_callback_userdata);
478 }
479
480 /* Drop seek info for following callbacks */
481 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
482 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
483 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
484 }
485 }
486
487 /* Frame complete */
488 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
489 if (p->read.memblock) {
490 assert(!p->read.packet);
491
492 pa_memblock_unref(p->read.memblock);
493 p->read.memblock = NULL;
494 } else {
495 assert(p->read.packet);
496
497 if (p->recieve_packet_callback)
498 #ifdef SCM_CREDENTIALS
499 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
500 #else
501 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
502 #endif
503
504 pa_packet_unref(p->read.packet);
505 p->read.packet = NULL;
506 }
507
508 p->read.index = 0;
509 #ifdef SCM_CREDENTIALS
510 p->read_creds_valid = 0;
511 #endif
512 }
513 }
514
515 return 0;
516 }
517
518 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
519 assert(p);
520 assert(p->ref >= 1);
521
522 p->die_callback = cb;
523 p->die_callback_userdata = userdata;
524 }
525
526
527 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
528 assert(p);
529 assert(p->ref >= 1);
530
531 p->drain_callback = cb;
532 p->drain_callback_userdata = userdata;
533 }
534
535 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
536 assert(p);
537 assert(p->ref >= 1);
538
539 p->recieve_packet_callback = cb;
540 p->recieve_packet_callback_userdata = userdata;
541 }
542
543 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
544 assert(p);
545 assert(p->ref >= 1);
546
547 p->recieve_memblock_callback = cb;
548 p->recieve_memblock_callback_userdata = userdata;
549 }
550
551 int pa_pstream_is_pending(pa_pstream *p) {
552 assert(p);
553
554 if (p->dead)
555 return 0;
556
557 return p->write.current || !pa_queue_is_empty(p->send_queue);
558 }
559
560 void pa_pstream_unref(pa_pstream*p) {
561 assert(p);
562 assert(p->ref >= 1);
563
564 if (--p->ref == 0)
565 pstream_free(p);
566 }
567
568 pa_pstream* pa_pstream_ref(pa_pstream*p) {
569 assert(p);
570 assert(p->ref >= 1);
571
572 p->ref++;
573 return p;
574 }
575
576 void pa_pstream_close(pa_pstream *p) {
577 assert(p);
578
579 p->dead = 1;
580
581 if (p->io) {
582 pa_iochannel_free(p->io);
583 p->io = NULL;
584 }
585
586 if (p->defer_event) {
587 p->mainloop->defer_free(p->defer_event);
588 p->defer_event = NULL;
589 }
590
591 p->die_callback = NULL;
592 p->drain_callback = NULL;
593 p->recieve_packet_callback = NULL;
594 p->recieve_memblock_callback = NULL;
595 }