4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as published
10 by the Free Software Foundation; either version 2 of the License,
11 or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/sink-input.h>
37 #include <pulsecore/source-output.h>
38 #include <pulsecore/client.h>
39 #include <pulsecore/sample-util.h>
40 #include <pulsecore/namereg.h>
41 #include <pulsecore/log.h>
42 #include <pulsecore/core-error.h>
44 #include "protocol-simple.h"
46 /* Don't allow more than this many concurrent connections */
47 #define MAX_CONNECTIONS 10
50 pa_protocol_simple
*protocol
;
52 pa_sink_input
*sink_input
;
53 pa_source_output
*source_output
;
55 pa_memblockq
*input_memblockq
, *output_memblockq
;
60 pa_memblock
*current_memblock
;
61 size_t memblock_index
, fragment_size
;
62 pa_atomic_int missing
;
66 struct pa_protocol_simple
{
69 pa_socket_server
*server
;
70 pa_idxset
*connections
;
72 pa_asyncmsgq
*asyncmsgq
;
80 pa_sample_spec sample_spec
;
81 char *source_name
, *sink_name
;
85 SINK_INPUT_MESSAGE_POST_DATA
= PA_SINK_INPUT_MESSAGE_MAX
, /* data from main loop to sink input */
89 MESSAGE_REQUEST_DATA
, /* data from source output to main loop */
90 MESSAGE_POST_DATA
/* data from source output to main loop */
94 #define PLAYBACK_BUFFER_SECONDS (.5)
95 #define PLAYBACK_BUFFER_FRAGMENTS (10)
96 #define RECORD_BUFFER_SECONDS (5)
97 #define RECORD_BUFFER_FRAGMENTS (100)
99 static void connection_free(struct connection
*c
) {
102 pa_idxset_remove_by_data(c
->protocol
->connections
, c
, NULL
);
105 pa_sink_input_disconnect(c
->sink_input
);
106 pa_sink_input_unref(c
->sink_input
);
109 if (c
->source_output
) {
110 pa_source_output_disconnect(c
->source_output
);
111 pa_source_output_unref(c
->source_output
);
114 if (c
->playback
.current_memblock
)
115 pa_memblock_unref(c
->playback
.current_memblock
);
118 pa_client_free(c
->client
);
120 pa_iochannel_free(c
->io
);
121 if (c
->input_memblockq
)
122 pa_memblockq_free(c
->input_memblockq
);
123 if (c
->output_memblockq
)
124 pa_memblockq_free(c
->output_memblockq
);
129 static int do_read(struct connection
*c
) {
137 if (!c
->sink_input
|| !(l
= pa_atomic_load(&c
->playback
.missing
)))
140 if (l
> c
->playback
.fragment_size
)
141 l
= c
->playback
.fragment_size
;
143 if (c
->playback
.current_memblock
)
144 if (pa_memblock_get_length(c
->playback
.current_memblock
) - c
->playback
.memblock_index
< l
) {
145 pa_memblock_unref(c
->playback
.current_memblock
);
146 c
->playback
.current_memblock
= NULL
;
147 c
->playback
.memblock_index
= 0;
150 if (!c
->playback
.current_memblock
) {
151 pa_assert_se(c
->playback
.current_memblock
= pa_memblock_new(c
->protocol
->core
->mempool
, l
));
152 c
->playback
.memblock_index
= 0;
155 p
= pa_memblock_acquire(c
->playback
.current_memblock
);
156 r
= pa_iochannel_read(c
->io
, (uint8_t*) p
+ c
->playback
.memblock_index
, l
);
157 pa_memblock_release(c
->playback
.current_memblock
);
161 if (errno
== EINTR
|| errno
== EAGAIN
)
164 pa_log_debug("read(): %s", r
== 0 ? "EOF" : pa_cstrerror(errno
));
168 chunk
.memblock
= c
->playback
.current_memblock
;
169 chunk
.index
= c
->playback
.memblock_index
;
172 c
->playback
.memblock_index
+= r
;
174 pa_asyncmsgq_post(c
->protocol
->asyncmsgq
, c
, MESSAGE_POST_DATA
, NULL
, &chunk
, NULL
, NULL
);
179 static int do_write(struct connection
*c
) {
186 if (!c
->source_output
)
189 if (pa_memblockq_peek(c
->output_memblockq
, &chunk
) < 0)
192 pa_assert(chunk
.memblock
);
193 pa_assert(chunk
.length
);
195 p
= pa_memblock_acquire(chunk
.memblock
);
196 r
= pa_iochannel_write(c
->io
, (uint8_t*) p
+chunk
.index
, chunk
.length
);
197 pa_memblock_release(chunk
.memblock
);
199 pa_memblock_unref(chunk
.memblock
);
203 if (errno
== EINTR
|| errno
== EAGAIN
)
206 pa_log("write(): %s", pa_cstrerror(errno
));
210 pa_memblockq_drop(c
->output_memblockq
, &chunk
, r
);
215 static void do_work(struct connection
*c
) {
221 if (pa_iochannel_is_readable(c
->io
)) {
224 } else if (pa_iochannel_is_hungup(c
->io
))
227 if (pa_iochannel_is_writable(c
->io
)) {
238 /* If there is a sink input, we first drain what we already have read before shutting down the connection */
241 pa_iochannel_free(c
->io
);
244 pa_memblockq_prebuf_disable(c
->input_memblockq
);
249 /*** sink_input callbacks ***/
251 /* Called from thread context */
252 static int sink_input_process_msg(pa_sink_input
*i
, int code
, void *userdata
, const pa_memchunk
*chunk
) {
261 case SINK_INPUT_MESSAGE_POST_DATA
: {
264 /* New data from the main loop */
265 pa_memblockq_push_align(c
->input_memblockq
, chunk
);
269 case PA_SINK_INPUT_MESSAGE_GET_LATENCY
: {
270 pa_usec_t
*r
= userdata
;
272 *r
= pa_bytes_to_usec(pa_memblockq_get_length(c
->input_memblockq
), &c
->sink_input
->sample_spec
);
274 /* Fall through, the default handler will add in the extra
275 * latency added by the resampler */
279 return pa_sink_input_process_msg(i
, code
, userdata
);
283 /* Called from thread context */
284 static int sink_input_peek_cb(pa_sink_input
*i
, pa_memchunk
*chunk
) {
292 r
= pa_memblockq_peek(c
->input_memblockq
, chunk
);
294 if (c
->dead
&& r
< 0)
300 /* Called from thread context */
301 static void sink_input_drop_cb(pa_sink_input
*i
, const pa_memchunk
*chunk
, size_t length
) {
302 struct connection
*c
= i
->userdata
;
309 old
= pa_memblockq_missing(c
->input_memblockq
);
310 pa_memblockq_drop(c
->input_memblockq
, chunk
, length
);
311 new = pa_memblockq_missing(c
->input_memblockq
);
313 pa_atomic_store(&c
->playback
.missing
, &new);
316 pa_asyncmsgq_post(c
->protocol
->asyncmsgq
, c
, MESSAGE_REQUEST_DATA
, NULL
, NULL
, NULL
, NULL
);
319 /* Called from main context */
320 static void sink_input_kill_cb(pa_sink_input
*i
) {
322 pa_assert(i
->userdata
);
324 connection_free((struct connection
*) i
->userdata
);
327 /*** source_output callbacks ***/
329 static void source_output_push_cb(pa_source_output
*o
, const pa_memchunk
*chunk
) {
330 struct connection
*c
;
337 pa_asyncmsgq_post(c
->protocol
->asyncmsgq
, c
, MESSAGE_REQUEST_DATA
, NULL
, chunk
, NULL
, NULL
);
340 static void source_output_kill_cb(pa_source_output
*o
) {
350 static pa_usec_t
source_output_get_latency_cb(pa_source_output
*o
) {
357 return pa_bytes_to_usec(pa_memblockq_get_length(c
->output_memblockq
), &c
->source_output
->sample_spec
);
360 /*** client callbacks ***/
362 static void client_kill_cb(pa_client
*client
) {
366 c
= client
->userdata
;
369 connection_free(client
);
372 /*** pa_iochannel callbacks ***/
374 static void io_callback(pa_iochannel
*io
, void *userdata
) {
375 struct connection
*c
= userdata
;
383 /*** socket_server callbacks ***/
385 static void on_connection(pa_socket_server
*s
, pa_iochannel
*io
, void *userdata
) {
386 pa_protocol_simple
*p
= userdata
;
387 struct connection
*c
= NULL
;
394 if (pa_idxset_size(p
->connections
)+1 > MAX_CONNECTIONS
) {
395 pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS
);
396 pa_iochannel_free(io
);
400 c
= pa_xnew(struct connection
, 1);
402 c
->sink_input
= NULL
;
403 c
->source_output
= NULL
;
404 c
->input_memblockq
= c
->output_memblockq
= NULL
;
406 c
->playback
.current_memblock
= NULL
;
407 c
->playback
.memblock_index
= 0;
408 c
->playback
.fragment_size
= 0;
410 pa_atomic_store(&c
->playback
.missing
, 0);
412 pa_iochannel_socket_peer_to_string(io
, cname
, sizeof(cname
));
413 pa_assert_se(c
->client
= pa_client_new(p
->core
, __FILE__
, cname
));
414 c
->client
->owner
= p
->module
;
415 c
->client
->kill
= client_kill_cb
;
416 c
->client
->userdata
= c
;
419 if (p
->mode
& PLAYBACK
) {
420 pa_sink_input_new_data data
;
423 pa_sink_input_new_data_init(&data
);
424 data
.driver
= __FILE__
;
425 data
.name
= c
->client
->name
;
426 pa_sink_input_new_data_set_sample_spec(&data
, &p
->sample_spec
);
427 data
.module
= p
->module
;
428 data
.client
= c
->client
;
430 if (!(c
->sink_input
= pa_sink_input_new(p
->core
, &data
, 0))) {
431 pa_log("Failed to create sink input.");
435 c
->sink_input
->peek
= sink_input_peek_cb
;
436 c
->sink_input
->drop
= sink_input_drop_cb
;
437 c
->sink_input
->kill
= sink_input_kill_cb
;
438 c
->sink_input
->get_latency
= sink_input_get_latency_cb
;
439 c
->sink_input
->userdata
= c
;
441 l
= (size_t) (pa_bytes_per_second(&p
->sample_spec
)*PLAYBACK_BUFFER_SECONDS
);
442 c
->input_memblockq
= pa_memblockq_new(
446 pa_frame_size(&p
->sample_spec
),
448 l
/PLAYBACK_BUFFER_FRAGMENTS
,
450 pa_assert(c
->input_memblockq
);
451 pa_iochannel_socket_set_rcvbuf(io
, l
/PLAYBACK_BUFFER_FRAGMENTS
*5);
452 c
->playback
.fragment_size
= l
/10;
454 pa_atomic_store(&c
->playback
.missing
, pa_memblockq_missing(c
->input_memblockq
));
456 pa_sink_input_put(c
->sink_input
);
459 if (p
->mode
& RECORD
) {
460 pa_source_output_new_data data
;
463 pa_source_output_new_data_init(&data
);
464 data
.driver
= __FILE__
;
465 data
.name
= c
->client
->name
;
466 pa_source_output_new_data_set_sample_spec(&data
, &p
->sample_spec
);
467 data
.module
= p
->module
;
468 data
.client
= c
->client
;
470 if (!(c
->source_output
= pa_source_output_new(p
->core
, &data
, 0))) {
471 pa_log("Failed to create source output.");
474 c
->source_output
->push
= source_output_push_cb
;
475 c
->source_output
->kill
= source_output_kill_cb
;
476 c
->source_output
->get_latency
= source_output_get_latency_cb
;
477 c
->source_output
->userdata
= c
;
479 l
= (size_t) (pa_bytes_per_second(&p
->sample_spec
)*RECORD_BUFFER_SECONDS
);
480 c
->output_memblockq
= pa_memblockq_new(
484 pa_frame_size(&p
->sample_spec
),
488 pa_iochannel_socket_set_sndbuf(io
, l
/RECORD_BUFFER_FRAGMENTS
*2);
490 pa_source_output_put(c
->source_output
);
494 pa_iochannel_set_callback(c
->io
, io_callback
, c
);
495 pa_idxset_put(p
->connections
, c
, NULL
);
504 static void asyncmsgq_cb(pa_mainloop_api
*api
, pa_io_event
* e
, int fd
, pa_io_event_flags_t events
, void *userdata
) {
505 pa_protocol_simple
*p
= userdata
;
506 int do_some_work
= 0;
508 pa_assert(pa_asyncmsgq_get_fd(p
->asyncmsgq
) == fd
);
509 pa_assert(events
== PA_IO_EVENT_INPUT
);
511 pa_asyncmsgq_after_poll(p
->asyncmsgq
);
517 /* Check whether there is a message for us to process */
518 while (pa_asyncmsgq_get(p
->asyncmsgq
, &object
, &code
, &data
) == 0) {
520 connection
*c
= object
;
526 case MESSAGE_REQUEST_DATA
:
530 case MESSAGE_POST_DATA
:
531 pa_memblockq_push(c
->output_memblockq
, chunk
);
536 pa_asyncmsgq_done(p
->asyncmsgq
);
539 if (pa_asyncmsgq_before_poll(p
->asyncmsgq
) == 0)
544 pa_protocol_simple
* pa_protocol_simple_new(pa_core
*core
, pa_socket_server
*server
, pa_module
*m
, pa_modargs
*ma
) {
545 pa_protocol_simple
* p
= NULL
;
552 p
= pa_xnew0(pa_protocol_simple
, 1);
556 p
->connections
= pa_idxset_new(NULL
, NULL
);
557 pa_assert_se(p
->asyncmsgq
= pa_asyncmsgq_new(0));
559 p
->sample_spec
= core
->default_sample_spec
;
560 if (pa_modargs_get_sample_spec(ma
, &p
->sample_spec
) < 0) {
561 pa_log("Failed to parse sample type specification.");
565 p
->source_name
= pa_xstrdup(pa_modargs_get_value(ma
, "source", NULL
));
566 p
->sink_name
= pa_xstrdup(pa_modargs_get_value(ma
, "sink", NULL
));
569 if (pa_modargs_get_value_boolean(ma
, "record", &enable
) < 0) {
570 pa_log("record= expects a numeric argument.");
573 p
->mode
= enable
? RECORD
: 0;
576 if (pa_modargs_get_value_boolean(ma
, "playback", &enable
) < 0) {
577 pa_log("playback= expects a numeric argument.");
580 p
->mode
|= enable
? PLAYBACK
: 0;
582 if ((p
->mode
& (RECORD
|PLAYBACK
)) == 0) {
583 pa_log("neither playback nor recording enabled for protocol.");
587 pa_socket_server_set_callback(p
->server
, on_connection
, p
);
589 pa_assert_se(pa_asyncmsgq_before_poll(p
->asyncmsgq
) == 0);
590 pa_assert_se(p
->asyncmsgq_event
= core
->mainloop
->io_event_new(core
->mainloop
, pa_asyncmsgq_get_fd(p
->asyncmsgq
), PA_IO_EVENT_INPUT
, p
));
596 pa_protocol_simple_free(p
);
602 void pa_protocol_simple_free(pa_protocol_simple
*p
) {
603 struct connection
*c
;
606 if (p
->connections
) {
607 while((c
= pa_idxset_first(p
->connections
, NULL
)))
610 pa_idxset_free(p
->connections
, NULL
, NULL
);
614 pa_socket_server_unref(p
->server
);
617 c
->mainloop
->io_event_free(c
->asyncmsgq_event
);
618 pa_asyncmsgq_after_poll(c
->asyncmsgq
);
619 pa_asyncmsgq_free(p
->asyncmsgq
);