]> code.delx.au - pulseaudio/blob - src/modules/rtp/module-rtp-recv.c
make sure we don't apply sampling rate fixes that bring the sampling freq > PA_RATE_MAX
[pulseaudio] / src / modules / rtp / module-rtp-recv.c
1
2 /***
3 This file is part of PulseAudio.
4
5 Copyright 2006 Lennart Poettering
6
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as published
9 by the Free Software Foundation; either version 2.1 of the License,
10 or (at your option) any later version.
11
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <poll.h>
35
36 #include <pulse/timeval.h>
37 #include <pulse/xmalloc.h>
38
39 #include <pulsecore/core-error.h>
40 #include <pulsecore/module.h>
41 #include <pulsecore/llist.h>
42 #include <pulsecore/sink.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/memblockq.h>
45 #include <pulsecore/log.h>
46 #include <pulsecore/core-util.h>
47 #include <pulsecore/modargs.h>
48 #include <pulsecore/namereg.h>
49 #include <pulsecore/sample-util.h>
50 #include <pulsecore/macro.h>
51 #include <pulsecore/atomic.h>
52 #include <pulsecore/rtclock.h>
53 #include <pulsecore/atomic.h>
54 #include <pulsecore/time-smoother.h>
55
56 #include "module-rtp-recv-symdef.h"
57
58 #include "rtp.h"
59 #include "sdp.h"
60 #include "sap.h"
61
62 PA_MODULE_AUTHOR("Lennart Poettering");
63 PA_MODULE_DESCRIPTION("Recieve data from a network via RTP/SAP/SDP");
64 PA_MODULE_VERSION(PACKAGE_VERSION);
65 PA_MODULE_LOAD_ONCE(FALSE);
66 PA_MODULE_USAGE(
67 "sink=<name of the sink> "
68 "sap_address=<multicast address to listen on> "
69 );
70
71 #define SAP_PORT 9875
72 #define DEFAULT_SAP_ADDRESS "224.0.0.56"
73 #define MEMBLOCKQ_MAXLENGTH (1024*1024*40)
74 #define MAX_SESSIONS 16
75 #define DEATH_TIMEOUT 20
76 #define RATE_UPDATE_INTERVAL (5*PA_USEC_PER_SEC)
77 #define LATENCY_USEC (500*PA_USEC_PER_MSEC)
78
79 static const char* const valid_modargs[] = {
80 "sink",
81 "sap_address",
82 NULL
83 };
84
85 struct session {
86 struct userdata *userdata;
87 PA_LLIST_FIELDS(struct session);
88
89 pa_sink_input *sink_input;
90 pa_memblockq *memblockq;
91
92 pa_bool_t first_packet;
93 uint32_t ssrc;
94 uint32_t offset;
95
96 struct pa_sdp_info sdp_info;
97
98 pa_rtp_context rtp_context;
99
100 pa_rtpoll_item *rtpoll_item;
101
102 pa_atomic_t timestamp;
103
104 pa_smoother *smoother;
105 pa_usec_t intended_latency;
106 pa_usec_t sink_latency;
107
108 pa_usec_t last_rate_update;
109 };
110
111 struct userdata {
112 pa_module *module;
113
114 pa_sap_context sap_context;
115 pa_io_event* sap_event;
116
117 pa_time_event *check_death_event;
118
119 char *sink_name;
120
121 PA_LLIST_HEAD(struct session, sessions);
122 pa_hashmap *by_origin;
123 int n_sessions;
124 };
125
126 static void session_free(struct session *s);
127
128 /* Called from I/O thread context */
129 static int sink_input_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
130 struct session *s = PA_SINK_INPUT(o)->userdata;
131
132 switch (code) {
133 case PA_SINK_INPUT_MESSAGE_GET_LATENCY:
134 *((pa_usec_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
135
136 /* Fall through, the default handler will add in the extra
137 * latency added by the resampler */
138 break;
139 }
140
141 return pa_sink_input_process_msg(o, code, data, offset, chunk);
142 }
143
144 /* Called from I/O thread context */
145 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
146 struct session *s;
147 pa_sink_input_assert_ref(i);
148 pa_assert_se(s = i->userdata);
149
150 if (pa_memblockq_peek(s->memblockq, chunk) < 0)
151 return -1;
152
153 pa_memblockq_drop(s->memblockq, chunk->length);
154
155 return 0;
156 }
157
158 /* Called from I/O thread context */
159 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
160 struct session *s;
161
162 pa_sink_input_assert_ref(i);
163 pa_assert_se(s = i->userdata);
164
165 pa_memblockq_rewind(s->memblockq, nbytes);
166 }
167
168 /* Called from thread context */
169 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
170 struct session *s;
171
172 pa_sink_input_assert_ref(i);
173 pa_assert_se(s = i->userdata);
174
175 pa_memblockq_set_maxrewind(s->memblockq, nbytes);
176 }
177
178 /* Called from main context */
179 static void sink_input_kill(pa_sink_input* i) {
180 struct session *s;
181 pa_sink_input_assert_ref(i);
182 pa_assert_se(s = i->userdata);
183
184 session_free(s);
185 }
186
187 /* Called from I/O thread context */
188 static int rtpoll_work_cb(pa_rtpoll_item *i) {
189 pa_memchunk chunk;
190 int64_t k, j, delta;
191 struct timeval now;
192 struct session *s;
193 struct pollfd *p;
194
195 pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
196
197 p = pa_rtpoll_item_get_pollfd(i, NULL);
198
199 if (p->revents & (POLLERR|POLLNVAL|POLLHUP|POLLOUT)) {
200 pa_log("poll() signalled bad revents.");
201 return -1;
202 }
203
204 if ((p->revents & POLLIN) == 0)
205 return 0;
206
207 p->revents = 0;
208
209 if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool) < 0)
210 return 0;
211
212 if (s->sdp_info.payload != s->rtp_context.payload) {
213 pa_memblock_unref(chunk.memblock);
214 return 0;
215 }
216
217 if (!s->first_packet) {
218 s->first_packet = TRUE;
219
220 s->ssrc = s->rtp_context.ssrc;
221 s->offset = s->rtp_context.timestamp;
222
223 if (s->ssrc == s->userdata->module->core->cookie)
224 pa_log_warn("Detected RTP packet loop!");
225 } else {
226 if (s->ssrc != s->rtp_context.ssrc) {
227 pa_memblock_unref(chunk.memblock);
228 return 0;
229 }
230 }
231
232 /* Check whether there was a timestamp overflow */
233 k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
234 j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
235
236 if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
237 delta = k;
238 else
239 delta = j;
240
241 pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, TRUE);
242
243 pa_rtclock_get(&now);
244
245 pa_smoother_put(s->smoother, pa_timeval_load(&now), pa_bytes_to_usec((uint64_t) pa_memblockq_get_write_index(s->memblockq), &s->sink_input->sample_spec));
246
247 if (pa_memblockq_push(s->memblockq, &chunk) < 0) {
248 pa_log_warn("Queue overrun");
249 pa_memblockq_seek(s->memblockq, (int64_t) chunk.length, PA_SEEK_RELATIVE, TRUE);
250 }
251
252 /* pa_log("blocks in q: %u", pa_memblockq_get_nblocks(s->memblockq)); */
253
254 pa_memblock_unref(chunk.memblock);
255
256 /* The next timestamp we expect */
257 s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
258
259 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
260
261 if (s->last_rate_update + RATE_UPDATE_INTERVAL < pa_timeval_load(&now)) {
262 pa_usec_t wi, ri, render_delay, sink_delay = 0, latency, fix;
263 unsigned fix_samples;
264
265 pa_log_debug("Updating sample rate");
266
267 wi = pa_smoother_get(s->smoother, pa_timeval_load(&now));
268 ri = pa_bytes_to_usec((uint64_t) pa_memblockq_get_read_index(s->memblockq), &s->sink_input->sample_spec);
269
270 sink_delay = pa_sink_get_latency_within_thread(s->sink_input->sink);
271 render_delay = pa_bytes_to_usec(pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq), &s->sink_input->sink->sample_spec);
272
273 if (ri > render_delay+sink_delay)
274 ri -= render_delay+sink_delay;
275 else
276 ri = 0;
277
278 if (wi < ri)
279 latency = 0;
280 else
281 latency = wi - ri;
282
283 pa_log_debug("Write index deviates by %0.2f ms, expected %0.2f ms", (double) latency/PA_USEC_PER_MSEC, (double) s->intended_latency/PA_USEC_PER_MSEC);
284
285 /* Calculate deviation */
286 if (latency < s->intended_latency)
287 fix = s->intended_latency - latency;
288 else
289 fix = latency - s->intended_latency;
290
291 /* How many samples is this per second? */
292 fix_samples = (unsigned) (fix * (pa_usec_t) s->sink_input->thread_info.sample_spec.rate / (pa_usec_t) RATE_UPDATE_INTERVAL);
293
294 /* Check if deviation is in bounds */
295 if (fix_samples > s->sink_input->sample_spec.rate*.20)
296 pa_log_debug("Hmmm, rate fix is too large (%lu Hz), not applying.", (unsigned long) fix_samples);
297 else {
298 /* Fix up rate */
299 if (latency < s->intended_latency)
300 s->sink_input->sample_spec.rate -= fix_samples;
301 else
302 s->sink_input->sample_spec.rate += fix_samples;
303
304 if (s->sink_input->sample_spec.rate > PA_RATE_MAX)
305 s->sink_input->sample_spec.rate = PA_RATE_MAX;
306 }
307
308 pa_assert(pa_sample_spec_valid(&s->sink_input->sample_spec));
309
310 pa_resampler_set_input_rate(s->sink_input->thread_info.resampler, s->sink_input->sample_spec.rate);
311
312 pa_log_debug("Updated sampling rate to %lu Hz.", (unsigned long) s->sink_input->sample_spec.rate);
313
314 s->last_rate_update = pa_timeval_load(&now);
315 }
316
317 if (pa_memblockq_is_readable(s->memblockq) &&
318 s->sink_input->thread_info.underrun_for > 0) {
319 pa_log_debug("Requesting rewind due to end of underrun");
320 pa_sink_input_request_rewind(s->sink_input, 0, FALSE, TRUE, FALSE);
321 }
322
323 return 1;
324 }
325
326 /* Called from I/O thread context */
327 static void sink_input_attach(pa_sink_input *i) {
328 struct session *s;
329 struct pollfd *p;
330
331 pa_sink_input_assert_ref(i);
332 pa_assert_se(s = i->userdata);
333
334 pa_assert(!s->rtpoll_item);
335 s->rtpoll_item = pa_rtpoll_item_new(i->sink->rtpoll, PA_RTPOLL_LATE, 1);
336
337 p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
338 p->fd = s->rtp_context.fd;
339 p->events = POLLIN;
340 p->revents = 0;
341
342 pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
343 pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
344 }
345
346 /* Called from I/O thread context */
347 static void sink_input_detach(pa_sink_input *i) {
348 struct session *s;
349 pa_sink_input_assert_ref(i);
350 pa_assert_se(s = i->userdata);
351
352 pa_assert(s->rtpoll_item);
353 pa_rtpoll_item_free(s->rtpoll_item);
354 s->rtpoll_item = NULL;
355 }
356
357 static int mcast_socket(const struct sockaddr* sa, socklen_t salen) {
358 int af, fd = -1, r, one;
359
360 pa_assert(sa);
361 pa_assert(salen > 0);
362
363 af = sa->sa_family;
364 if ((fd = socket(af, SOCK_DGRAM, 0)) < 0) {
365 pa_log("Failed to create socket: %s", pa_cstrerror(errno));
366 goto fail;
367 }
368
369 one = 1;
370 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) {
371 pa_log("SO_REUSEADDR failed: %s", pa_cstrerror(errno));
372 goto fail;
373 }
374
375 if (af == AF_INET) {
376 struct ip_mreq mr4;
377 memset(&mr4, 0, sizeof(mr4));
378 mr4.imr_multiaddr = ((const struct sockaddr_in*) sa)->sin_addr;
379 r = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4));
380 #ifdef HAVE_IPV6
381 } else {
382 struct ipv6_mreq mr6;
383 memset(&mr6, 0, sizeof(mr6));
384 mr6.ipv6mr_multiaddr = ((const struct sockaddr_in6*) sa)->sin6_addr;
385 r = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6));
386 #endif
387 }
388
389 if (r < 0) {
390 pa_log_info("Joining mcast group failed: %s", pa_cstrerror(errno));
391 goto fail;
392 }
393
394 if (bind(fd, sa, salen) < 0) {
395 pa_log("bind() failed: %s", pa_cstrerror(errno));
396 goto fail;
397 }
398
399 return fd;
400
401 fail:
402 if (fd >= 0)
403 close(fd);
404
405 return -1;
406 }
407
408 static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_info) {
409 struct session *s = NULL;
410 pa_sink *sink;
411 int fd = -1;
412 pa_memchunk silence;
413 pa_sink_input_new_data data;
414 struct timeval now;
415
416 pa_assert(u);
417 pa_assert(sdp_info);
418
419 if (u->n_sessions >= MAX_SESSIONS) {
420 pa_log("Session limit reached.");
421 goto fail;
422 }
423
424 if (!(sink = pa_namereg_get(u->module->core, u->sink_name, PA_NAMEREG_SINK))) {
425 pa_log("Sink does not exist.");
426 goto fail;
427 }
428
429 pa_rtclock_get(&now);
430
431 s = pa_xnew0(struct session, 1);
432 s->userdata = u;
433 s->first_packet = FALSE;
434 s->sdp_info = *sdp_info;
435 s->rtpoll_item = NULL;
436 s->intended_latency = LATENCY_USEC;
437 s->smoother = pa_smoother_new(
438 PA_USEC_PER_SEC*5,
439 PA_USEC_PER_SEC*2,
440 TRUE,
441 TRUE,
442 10,
443 pa_timeval_load(&now),
444 FALSE);
445 s->last_rate_update = pa_timeval_load(&now);
446 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
447
448 if ((fd = mcast_socket((const struct sockaddr*) &sdp_info->sa, sdp_info->salen)) < 0)
449 goto fail;
450
451 pa_sink_input_new_data_init(&data);
452 data.sink = sink;
453 data.driver = __FILE__;
454 pa_proplist_sets(data.proplist, PA_PROP_MEDIA_ROLE, "stream");
455 pa_proplist_setf(data.proplist, PA_PROP_MEDIA_NAME,
456 "RTP Stream%s%s%s",
457 sdp_info->session_name ? " (" : "",
458 sdp_info->session_name ? sdp_info->session_name : "",
459 sdp_info->session_name ? ")" : "");
460
461 if (sdp_info->session_name)
462 pa_proplist_sets(data.proplist, "rtp.session", sdp_info->session_name);
463 pa_proplist_sets(data.proplist, "rtp.origin", sdp_info->origin);
464 pa_proplist_setf(data.proplist, "rtp.payload", "%u", (unsigned) sdp_info->payload);
465 data.module = u->module;
466 pa_sink_input_new_data_set_sample_spec(&data, &sdp_info->sample_spec);
467
468 pa_sink_input_new(&s->sink_input, u->module->core, &data, PA_SINK_INPUT_VARIABLE_RATE);
469 pa_sink_input_new_data_done(&data);
470
471 if (!s->sink_input) {
472 pa_log("Failed to create sink input.");
473 goto fail;
474 }
475
476 s->sink_input->userdata = s;
477
478 s->sink_input->parent.process_msg = sink_input_process_msg;
479 s->sink_input->pop = sink_input_pop_cb;
480 s->sink_input->process_rewind = sink_input_process_rewind_cb;
481 s->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
482 s->sink_input->kill = sink_input_kill;
483 s->sink_input->attach = sink_input_attach;
484 s->sink_input->detach = sink_input_detach;
485
486 pa_sink_input_get_silence(s->sink_input, &silence);
487
488 s->sink_latency = pa_sink_input_set_requested_latency(s->sink_input, s->intended_latency/2);
489
490 if (s->intended_latency < s->sink_latency*2)
491 s->intended_latency = s->sink_latency*2;
492
493 s->memblockq = pa_memblockq_new(
494 0,
495 MEMBLOCKQ_MAXLENGTH,
496 MEMBLOCKQ_MAXLENGTH,
497 pa_frame_size(&s->sink_input->sample_spec),
498 pa_usec_to_bytes(s->intended_latency - s->sink_latency, &s->sink_input->sample_spec),
499 0,
500 0,
501 &silence);
502
503 pa_memblock_unref(silence.memblock);
504
505 pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
506
507 pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
508 u->n_sessions++;
509 PA_LLIST_PREPEND(struct session, s->userdata->sessions, s);
510
511 pa_sink_input_put(s->sink_input);
512
513 pa_log_info("New session '%s'", s->sdp_info.session_name);
514
515 return s;
516
517 fail:
518 pa_xfree(s);
519
520 if (fd >= 0)
521 pa_close(fd);
522
523 return NULL;
524 }
525
526 static void session_free(struct session *s) {
527 pa_assert(s);
528
529 pa_log_info("Freeing session '%s'", s->sdp_info.session_name);
530
531 pa_sink_input_unlink(s->sink_input);
532 pa_sink_input_unref(s->sink_input);
533
534 PA_LLIST_REMOVE(struct session, s->userdata->sessions, s);
535 pa_assert(s->userdata->n_sessions >= 1);
536 s->userdata->n_sessions--;
537 pa_hashmap_remove(s->userdata->by_origin, s->sdp_info.origin);
538
539 pa_memblockq_free(s->memblockq);
540 pa_sdp_info_destroy(&s->sdp_info);
541 pa_rtp_context_destroy(&s->rtp_context);
542
543 pa_smoother_free(s->smoother);
544
545 pa_xfree(s);
546 }
547
548 static void sap_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_flags_t flags, void *userdata) {
549 struct userdata *u = userdata;
550 pa_bool_t goodbye = FALSE;
551 pa_sdp_info info;
552 struct session *s;
553
554 pa_assert(m);
555 pa_assert(e);
556 pa_assert(u);
557 pa_assert(fd == u->sap_context.fd);
558 pa_assert(flags == PA_IO_EVENT_INPUT);
559
560 if (pa_sap_recv(&u->sap_context, &goodbye) < 0)
561 return;
562
563 if (!pa_sdp_parse(u->sap_context.sdp_data, &info, goodbye))
564 return;
565
566 if (goodbye) {
567
568 if ((s = pa_hashmap_get(u->by_origin, info.origin)))
569 session_free(s);
570
571 pa_sdp_info_destroy(&info);
572 } else {
573
574 if (!(s = pa_hashmap_get(u->by_origin, info.origin))) {
575 if (!session_new(u, &info))
576 pa_sdp_info_destroy(&info);
577
578 } else {
579 struct timeval now;
580 pa_rtclock_get(&now);
581 pa_atomic_store(&s->timestamp, (int) now.tv_sec);
582
583 pa_sdp_info_destroy(&info);
584 }
585 }
586 }
587
588 static void check_death_event_cb(pa_mainloop_api *m, pa_time_event *t, const struct timeval *ptv, void *userdata) {
589 struct session *s, *n;
590 struct userdata *u = userdata;
591 struct timeval now;
592 struct timeval tv;
593
594 pa_assert(m);
595 pa_assert(t);
596 pa_assert(ptv);
597 pa_assert(u);
598
599 pa_rtclock_get(&now);
600
601 pa_log_debug("Checking for dead streams ...");
602
603 for (s = u->sessions; s; s = n) {
604 int k;
605 n = s->next;
606
607 k = pa_atomic_load(&s->timestamp);
608
609 if (k + DEATH_TIMEOUT < now.tv_sec)
610 session_free(s);
611 }
612
613 /* Restart timer */
614 pa_gettimeofday(&tv);
615 pa_timeval_add(&tv, DEATH_TIMEOUT*PA_USEC_PER_SEC);
616 m->time_restart(t, &tv);
617 }
618
619 int pa__init(pa_module*m) {
620 struct userdata *u;
621 pa_modargs *ma = NULL;
622 struct sockaddr_in sa4;
623 #ifdef HAVE_IPV6
624 struct sockaddr_in6 sa6;
625 #endif
626 struct sockaddr *sa;
627 socklen_t salen;
628 const char *sap_address;
629 int fd = -1;
630 struct timeval tv;
631
632 pa_assert(m);
633
634 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
635 pa_log("failed to parse module arguments");
636 goto fail;
637 }
638
639 sap_address = pa_modargs_get_value(ma, "sap_address", DEFAULT_SAP_ADDRESS);
640
641 if (inet_pton(AF_INET, sap_address, &sa4.sin_addr) > 0) {
642 sa4.sin_family = AF_INET;
643 sa4.sin_port = htons(SAP_PORT);
644 sa = (struct sockaddr*) &sa4;
645 salen = sizeof(sa4);
646 #ifdef HAVE_IPV6
647 } else if (inet_pton(AF_INET6, sap_address, &sa6.sin6_addr) > 0) {
648 sa6.sin6_family = AF_INET6;
649 sa6.sin6_port = htons(SAP_PORT);
650 sa = (struct sockaddr*) &sa6;
651 salen = sizeof(sa6);
652 #endif
653 } else {
654 pa_log("Invalid SAP address '%s'", sap_address);
655 goto fail;
656 }
657
658 if ((fd = mcast_socket(sa, salen)) < 0)
659 goto fail;
660
661 m->userdata = u = pa_xnew(struct userdata, 1);
662 u->module = m;
663 u->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
664
665 u->sap_event = m->core->mainloop->io_new(m->core->mainloop, fd, PA_IO_EVENT_INPUT, sap_event_cb, u);
666 pa_sap_context_init_recv(&u->sap_context, fd);
667
668 PA_LLIST_HEAD_INIT(struct session, u->sessions);
669 u->n_sessions = 0;
670 u->by_origin = pa_hashmap_new(pa_idxset_string_hash_func, pa_idxset_string_compare_func);
671
672 pa_gettimeofday(&tv);
673 pa_timeval_add(&tv, DEATH_TIMEOUT * PA_USEC_PER_SEC);
674 u->check_death_event = m->core->mainloop->time_new(m->core->mainloop, &tv, check_death_event_cb, u);
675
676 pa_modargs_free(ma);
677
678 return 0;
679
680 fail:
681 if (ma)
682 pa_modargs_free(ma);
683
684 if (fd >= 0)
685 pa_close(fd);
686
687 return -1;
688 }
689
690 void pa__done(pa_module*m) {
691 struct userdata *u;
692 struct session *s;
693
694 pa_assert(m);
695
696 if (!(u = m->userdata))
697 return;
698
699 if (u->sap_event)
700 m->core->mainloop->io_free(u->sap_event);
701
702 if (u->check_death_event)
703 m->core->mainloop->time_free(u->check_death_event);
704
705 pa_sap_context_destroy(&u->sap_context);
706
707 if (u->by_origin) {
708 while ((s = pa_hashmap_first(u->by_origin)))
709 session_free(s);
710
711 pa_hashmap_free(u->by_origin, NULL, NULL);
712 }
713
714 pa_xfree(u->sink_name);
715 pa_xfree(u);
716 }