]> code.delx.au - pulseaudio/commitdiff
main part of the native protocol
authorLennart Poettering <lennart@poettering.net>
Wed, 23 Jun 2004 23:17:30 +0000 (23:17 +0000)
committerLennart Poettering <lennart@poettering.net>
Wed, 23 Jun 2004 23:17:30 +0000 (23:17 +0000)
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@31 fefdeb5f-60dc-0310-8127-8f9354f1896f

62 files changed:
bootstrap.sh
configure.ac
src/Makefile.am
src/cli.c
src/core.c
src/core.h
src/iochannel.c
src/iochannel.h
src/main.c
src/mainloop-api.c [new file with mode: 0644]
src/mainloop-api.h [new file with mode: 0644]
src/mainloop-signal.c [new file with mode: 0644]
src/mainloop-signal.h [new file with mode: 0644]
src/mainloop.c
src/mainloop.h
src/memblockq.c
src/memblockq.h
src/module-oss-mmap.c
src/module-oss.c
src/module-pipe-sink.c
src/module.c
src/oss.c
src/oss.h
src/pacat.c [new file with mode: 0644]
src/packet.c
src/pdispatch.c [new file with mode: 0644]
src/pdispatch.h [new file with mode: 0644]
src/polyp.c [new file with mode: 0644]
src/polyp.h [new file with mode: 0644]
src/polypdef.h [new file with mode: 0644]
src/protocol-cli.c
src/protocol-native-spec.h [new file with mode: 0644]
src/protocol-native.c
src/protocol-simple.c
src/pstream-util.c [new file with mode: 0644]
src/pstream-util.h [new file with mode: 0644]
src/pstream.c
src/pstream.h
src/queue.c
src/sample-util.c [new file with mode: 0644]
src/sample-util.h [new file with mode: 0644]
src/sample.c
src/sample.h
src/simple.c [new file with mode: 0644]
src/simple.h [new file with mode: 0644]
src/sink.c
src/sink.h
src/sinkinput.c
src/sinkinput.h
src/socket-client.c [new file with mode: 0644]
src/socket-client.h [new file with mode: 0644]
src/socket-server.c
src/socket-server.h
src/source.c
src/source.h
src/sourceoutput.c
src/sourceoutput.h
src/tagstruct.c
src/tagstruct.h
src/todo
src/util.c [new file with mode: 0644]
src/util.h [new file with mode: 0644]

index f26ceb15d2fbdbdf807d378148b0ee37303f9c24..c9880d85e7071014a808283a22727bbe55618f54 100755 (executable)
@@ -33,7 +33,7 @@ else
     automake -a -c
     autoconf -Wall
 
-    ./configure --sysconfdir=/etc "$@"
+    CFLAGS="-g -O0" ./configure --sysconfdir=/etc "$@"
 
     make clean
 fi
index 16376902be1f3be60fd2fd734f6c006e0f0664fd..3a14a061fbccfc8b646bfbbc69d538a4eb9aa011 100644 (file)
@@ -42,7 +42,7 @@ AC_PROG_LIBTOOL
 
 # If using GCC specifiy some additional parameters
 if test "x$GCC" = "xyes" ; then
-   CFLAGS="$CFLAGS -pipe -Wall -W"
+   CFLAGS="$CFLAGS -pipe -Wall -W -Wno-unused-parameter"
 fi
 
 AC_CONFIG_FILES([Makefile src/Makefile])
index 443a25f23c72f81c1b66d4c242e6d9b3a9895dd9..6ad1488f40871e294337d81243f6c1684a88152f 100644 (file)
 
 AM_CFLAGS=-ansi -D_GNU_SOURCE
 
-bin_PROGRAMS = polypaudio 
+bin_PROGRAMS = polypaudio pacat
 
-pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \
-               libsocket-server.la module-pipe-sink.la libpstream.la libiochannel.la \
+pkglib_LTLIBRARIES=libiochannel.la libsocket-server.la libsocket-client.la \
+               libprotocol-simple.la module-simple-protocol-tcp.la \
+               module-pipe-sink.la libpstream.la \
                libpacket.la module-oss.la module-oss-mmap.la liboss.la libioline.la \
                libcli.la module-cli.la libtokenizer.la libdynarray.la \
                module-simple-protocol-unix.la module-cli-protocol-tcp.la \
-               libprotocol-cli.la libprotocol-native.la module-native-protocol-tcp.la \
-        module-native-protocol-unix.la module-cli-protocol-unix.la libtagstruct.la
+               libprotocol-cli.la module-cli-protocol-unix.la libtagstruct.la \
+               libpdispatch.la libprotocol-native.la libpstream-util.la \
+        module-native-protocol-tcp.la module-native-protocol-unix.la \
+               libpolyp.la
 
 polypaudio_SOURCES = idxset.c idxset.h \
                queue.c queue.h \
                strbuf.c strbuf.h \
+               main.c main.h \
                mainloop.c mainloop.h \
                memblock.c memblock.h \
                sample.c sample.h \
+               sample-util.c sample-util.h \
                memblockq.c memblockq.h \
                client.c client.h \
                core.c core.h \
-               main.c main.h \
                sourceoutput.c sourceoutput.h \
                sinkinput.c sinkinput.h \
                source.c source.h \
                sink.c sink.h \
-               module.c module.h
+               module.c module.h \
+               mainloop-signal.c mainloop-signal.h \
+               mainloop-api.c mainloop-api.h \
+               util.c util.h
+polypaudio_CFLAGS = $(AM_CFLAGS)
 
 polypaudio_INCLUDES = $(INCLTDL)
 polypaudio_LDADD = $(LIBLTDL) 
@@ -56,10 +64,22 @@ libsocket_server_la_SOURCES = socket-server.c socket-server.h
 libsocket_server_la_LDFLAGS = -avoid-version
 libsocket_server_la_LIBADD = libiochannel.la
 
+libsocket_client_la_SOURCES = socket-client.c socket-client.h
+libsocket_client_la_LDFLAGS = -avoid-version
+libsocket_client_la_LIBADD = libiochannel.la
+
 libpstream_la_SOURCES = pstream.c pstream.h
 libpstream_la_LDFLAGS = -avoid-version
 libpstream_la_LIBADD = libpacket.la
 
+libpstream_util_la_SOURCES = pstream-util.c pstream-util.h
+libpstream_util_la_LDFLAGS = -avoid-version
+libpstream_util_la_LIBADD = libpstream.la libtagstruct.la
+
+libpdispatch_la_SOURCES = pdispatch.c pdispatch.h
+libpdispatch_la_LDFLAGS = -avoid-version
+libpdispatch_la_LIBADD = libpacket.la libtagstruct.la
+
 libiochannel_la_SOURCES = iochannel.c iochannel.h
 libiochannel_la_LDFLAGS = -avoid-version
 
@@ -90,7 +110,7 @@ libprotocol_cli_la_LIBADD = libsocket-server.la libiochannel.la libcli.la
 
 libprotocol_native_la_SOURCES = protocol-native.c protocol-native.h
 libprotocol_native_la_LDFLAGS = -avoid-version
-libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la
+libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la libpstream-util.la
 
 libtagstruct_la_SOURCES = tagstruct.c tagstruct.h
 libtagstruct_la_LDFLAGS = -avoid-version
@@ -140,3 +160,24 @@ module_oss_mmap_la_LIBADD = libiochannel.la liboss.la
 module_cli_la_SOURCES = module-cli.c
 module_cli_la_LDFLAGS = -module -avoid-version
 module_cli_la_LIBADD = libcli.la libiochannel.la libtokenizer.la
+
+libpolyp_la_SOURCES = polyp.c polyp.h \
+               polypdef.h \
+               tagstruct.c tagstruct.h \
+               iochannel.c iochannel.h \
+               pstream.c pstream.h \
+               pstream-util.c pstream-util.h \
+               pdispatch.c pdispatch.h \
+               protocol-native-spec.h \
+               mainloop-api.c mainloop-api.h \
+               mainloop.c mainloop.h \
+               idxset.c idxset.h \
+               util.c util.h \
+               memblock.c memblock.h \
+               socket-client.c socket-client.h \
+               packet.c packet.h \
+               queue.c queue.h \
+               dynarray.c dynarray.h
+
+pacat_SOURCES = pacat.c
+pacat_LDADD = libpolyp.la
index ec484ace3284d7e752a3d98bae84a4ca9242b37a..0916235102ab8772513714718b6e8374285adc74 100644 (file)
--- a/src/cli.c
+++ b/src/cli.c
@@ -20,6 +20,8 @@ struct cli {
 
     void (*eof_callback)(struct cli *c, void *userdata);
     void *userdata;
+
+    struct client *client;
 };
 
 struct command {
@@ -63,6 +65,7 @@ static const struct command commands[] = {
 static const char prompt[] = ">>> ";
 
 struct cli* cli_new(struct core *core, struct iochannel *io) {
+    char cname[256];
     struct cli *c;
     assert(io);
 
@@ -75,16 +78,21 @@ struct cli* cli_new(struct core *core, struct iochannel *io) {
     c->userdata = NULL;
     c->eof_callback = NULL;
 
+    iochannel_peer_to_string(io, cname, sizeof(cname));
+    c->client = client_new(core, "CLI", cname);
+    assert(c->client);
+    
     ioline_set_callback(c->line, line_callback, c);
     ioline_puts(c->line, "Welcome to polypaudio! Use \"help\" for usage information.\n");
     ioline_puts(c->line, prompt);
-
+    
     return c;
 }
 
 void cli_free(struct cli *c) {
     assert(c);
     ioline_free(c->line);
+    client_free(c->client);
     free(c);
 }
 
@@ -135,7 +143,7 @@ void cli_set_eof_callback(struct cli *c, void (*cb)(struct cli*c, void *userdata
 
 static void cli_command_exit(struct cli *c, struct tokenizer *t) {
     assert(c && c->core && c->core->mainloop && t);
-    mainloop_quit(c->core->mainloop, -1);
+    c->core->mainloop->quit(c->core->mainloop, 0);
 }
 
 static void cli_command_help(struct cli *c, struct tokenizer *t) {
index 50248501a7c571b10fa36a7d576d62fb265bf195..d9df38e1e9c05714b1c7f99ef665363e538f14e3 100644 (file)
@@ -7,7 +7,7 @@
 #include "sink.h"
 #include "source.h"
 
-struct core* core_new(struct mainloop *m) {
+struct core* core_new(struct pa_mainloop_api *m) {
     struct core* c;
     c = malloc(sizeof(struct core));
     assert(c);
index f6f794b99eb12bc736e51bcef09e4e0f18433960..8c4c6233d3eba46edf829367676bdff3319941d3 100644 (file)
@@ -2,17 +2,17 @@
 #define foocorehfoo
 
 #include "idxset.h"
-#include "mainloop.h"
+#include "mainloop-api.h"
 
 struct core {
-    struct mainloop *mainloop;
+    struct pa_mainloop_api *mainloop;
 
     struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules;
 
     uint32_t default_source_index, default_sink_index;
 };
 
-struct core* core_new(struct mainloop *m);
+struct core* core_new(struct pa_mainloop_api *m);
 void core_free(struct core*c);
 
 #endif
index f0c4c4998b7fb1420bfd183b869cf90b9554da67..910b7e0b3a8b07191f31bda3ce0bc7808b55b6eb 100644 (file)
@@ -4,10 +4,11 @@
 #include <unistd.h>
 
 #include "iochannel.h"
+#include "util.h"
 
 struct iochannel {
     int ifd, ofd;
-    struct mainloop* mainloop;
+    struct pa_mainloop_api* mainloop;
 
     void (*callback)(struct iochannel*io, void *userdata);
     void*userdata;
@@ -17,43 +18,45 @@ struct iochannel {
 
     int no_close;
 
-    struct mainloop_source* input_source, *output_source;
+    void* input_source, *output_source;
 };
 
 static void enable_mainloop_sources(struct iochannel *io) {
     assert(io);
 
     if (io->input_source == io->output_source) {
-        enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL;
+        enum pa_mainloop_api_io_events e = PA_MAINLOOP_API_IO_EVENT_NULL;
         assert(io->input_source);
         
         if (!io->readable)
-            e |= MAINLOOP_IO_EVENT_IN;
+            e |= PA_MAINLOOP_API_IO_EVENT_INPUT;
         if (!io->writable)
-            e |= MAINLOOP_IO_EVENT_OUT;
+            e |= PA_MAINLOOP_API_IO_EVENT_OUTPUT;
 
-        mainloop_source_io_set_events(io->input_source, e);
+        io->mainloop->enable_io(io->mainloop, io->input_source, e);
     } else {
         if (io->input_source)
-            mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN);
+            io->mainloop->enable_io(io->mainloop, io->input_source, io->readable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_INPUT);
         if (io->output_source)
-            mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT);
+            io->mainloop->enable_io(io->mainloop, io->output_source, io->writable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_OUTPUT);
     }
 }
 
-static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) {
+static void callback(struct pa_mainloop_api* m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
     struct iochannel *io = userdata;
     int changed = 0;
-    assert(s && fd >= 0 && userdata);
+    assert(m && fd >= 0 && events && userdata);
 
-    if ((events & MAINLOOP_IO_EVENT_IN) && !io->readable) {
+    if ((events & PA_MAINLOOP_API_IO_EVENT_INPUT) && !io->readable) {
         io->readable = 1;
         changed = 1;
+        assert(id == io->input_source);
     }
     
-    if ((events & MAINLOOP_IO_EVENT_OUT) && !io->writable) {
+    if ((events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) && !io->writable) {
         io->writable = 1;
         changed = 1;
+        assert(id == io->output_source);
     }
 
     if (changed) {
@@ -64,15 +67,7 @@ static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event ev
     }
 }
 
-static void make_nonblock_fd(int fd) {
-    int v;
-
-    if ((v = fcntl(fd, F_GETFL)) >= 0)
-        if (!(v & O_NONBLOCK))
-            fcntl(fd, F_SETFL, v|O_NONBLOCK);
-}
-
-struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
+struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd) {
     struct iochannel *io;
     assert(m && (ifd >= 0 || ofd >= 0));
 
@@ -90,18 +85,18 @@ struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
     if (ifd == ofd) {
         assert(ifd >= 0);
         make_nonblock_fd(io->ifd);
-        io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io);
+        io->input_source = io->output_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_BOTH, callback, io);
     } else {
 
         if (ifd >= 0) {
             make_nonblock_fd(io->ifd);
-            io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io);
+            io->input_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, io);
         } else
             io->input_source = NULL;
 
         if (ofd >= 0) {
             make_nonblock_fd(io->ofd);
-            io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io);
+            io->output_source = m->source_io(m, ofd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, callback, io);
         } else
             io->output_source = NULL;
     }
@@ -120,9 +115,9 @@ void iochannel_free(struct iochannel*io) {
     }
 
     if (io->input_source)
-        mainloop_source_free(io->input_source);
+        io->mainloop->cancel_io(io->mainloop, io->input_source);
     if (io->output_source && io->output_source != io->input_source)
-        mainloop_source_free(io->output_source);
+        io->mainloop->cancel_io(io->mainloop, io->output_source);
     
     free(io);
 }
@@ -172,3 +167,8 @@ void iochannel_set_noclose(struct iochannel*io, int b) {
     assert(io);
     io->no_close = b;
 }
+
+void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l) {
+    assert(io && s && l);
+    peer_to_string(s, l, io->ifd);
+}
index 8ed8b87899b55545c2154284c1eacca8f5cdc1cc..b0465a194701021279cdd1a83d5e108314b817a8 100644 (file)
@@ -2,11 +2,11 @@
 #define fooiochannelhfoo
 
 #include <sys/types.h>
-#include "mainloop.h"
+#include "mainloop-api.h"
 
 struct iochannel;
 
-struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd);
+struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);
 void iochannel_free(struct iochannel*io);
 
 ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l);
@@ -19,4 +19,6 @@ void iochannel_set_noclose(struct iochannel*io, int b);
 
 void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata);
 
+void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l);
+
 #endif
index f35505ecbad65a64e2b13f705e5732a4807f5a70..ef25b5e30a0016c5037a6a1180bb838742e64c3e 100644 (file)
@@ -8,46 +8,52 @@
 #include "core.h"
 #include "mainloop.h"
 #include "module.h"
+#include "mainloop-signal.h"
 
 int stdin_inuse = 0, stdout_inuse = 0;
 
-static void signal_callback(struct mainloop_source *m, int sig, void *userdata) {
-    mainloop_quit(mainloop_source_get_mainloop(m), -1);
+static struct pa_mainloop *mainloop;
+
+static void signal_callback(void *id, int sig, void *userdata) {
+    struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop);
+    m->quit(m, 1);
     fprintf(stderr, "main: got signal.\n");
 }
 
 int main(int argc, char *argv[]) {
-    struct mainloop *m;
     struct core *c;
-    int r;
+    int r, retval = 0;
 
     r = lt_dlinit();
     assert(r == 0);
     
-    m = mainloop_new();
-    assert(m);
-    c = core_new(m);
-    assert(c);
+    mainloop = pa_mainloop_new();
+    assert(mainloop);
 
-    mainloop_source_new_signal(m, SIGINT, signal_callback, NULL);
+    r = pa_signal_init(pa_mainloop_get_api(mainloop));
+    assert(r == 0);
+    pa_signal_register(SIGINT, signal_callback, NULL);
     signal(SIGPIPE, SIG_IGN);
 
+    c = core_new(pa_mainloop_get_api(mainloop));
+    assert(c);
+
     module_load(c, "module-oss-mmap", "/dev/dsp1");
     module_load(c, "module-pipe-sink", NULL);
     module_load(c, "module-simple-protocol-tcp", NULL);
     module_load(c, "module-cli", NULL);
     
     fprintf(stderr, "main: mainloop entry.\n");
-    while (mainloop_iterate(m, 1) == 0);
-/*        fprintf(stderr, "main: %u blocks\n", n_blocks);*/
+    if (pa_mainloop_run(mainloop, &retval) < 0)
+        retval = 1;
     fprintf(stderr, "main: mainloop exit.\n");
-
-    mainloop_run(m);
     
     core_free(c);
-    mainloop_free(m);
+
+    pa_signal_done();
+    pa_mainloop_free(mainloop);
 
     lt_dlexit();
     
-    return 0;
+    return retval;
 }
diff --git a/src/mainloop-api.c b/src/mainloop-api.c
new file mode 100644 (file)
index 0000000..6caa0c2
--- /dev/null
@@ -0,0 +1,35 @@
+#include <assert.h>
+#include <stdlib.h>
+#include "mainloop-api.h"
+
+struct once_info {
+    void (*callback)(void *userdata);
+    void *userdata;
+};
+
+static void once_callback(struct pa_mainloop_api *api, void *id, void *userdata) {
+    struct once_info *i = userdata;
+    assert(api && i && i->callback);
+    i->callback(i->userdata);
+    assert(api->cancel_fixed);
+    api->cancel_fixed(api, id);
+    free(i);
+}
+
+void pa_mainloop_api_once(struct pa_mainloop_api* api, void (*callback)(void *userdata), void *userdata) {
+    struct once_info *i;
+    void *id;
+    assert(api && callback);
+
+    i = malloc(sizeof(struct once_info));
+    assert(i);
+    i->callback = callback;
+    i->userdata = userdata;
+
+    assert(api->source_fixed);
+    id = api->source_fixed(api, once_callback, i);
+    assert(id);
+
+    /* Note: if the mainloop is destroyed before once_callback() was called, some memory is leaked. */
+}
+
diff --git a/src/mainloop-api.h b/src/mainloop-api.h
new file mode 100644 (file)
index 0000000..96dacc2
--- /dev/null
@@ -0,0 +1,43 @@
+#ifndef foomainloopapihfoo
+#define foomainloopapihfoo
+
+#include <time.h>
+#include <sys/time.h>
+
+enum pa_mainloop_api_io_events {
+    PA_MAINLOOP_API_IO_EVENT_NULL = 0,
+    PA_MAINLOOP_API_IO_EVENT_INPUT = 1,
+    PA_MAINLOOP_API_IO_EVENT_OUTPUT = 2,
+    PA_MAINLOOP_API_IO_EVENT_BOTH = 3
+};
+
+struct pa_mainloop_api {
+    void *userdata;
+
+    /* IO sources */
+    void* (*source_io)(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata);
+    void  (*enable_io)(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events);
+    void  (*cancel_io)(struct pa_mainloop_api*a, void* id);
+
+    /* Fixed sources */
+    void* (*source_fixed)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
+    void  (*enable_fixed)(struct pa_mainloop_api*a, void* id, int b);
+    void  (*cancel_fixed)(struct pa_mainloop_api*a, void* id);
+
+    /* Idle sources */
+    void* (*source_idle)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
+    void  (*enable_idle)(struct pa_mainloop_api*a, void* id, int b);
+    void  (*cancel_idle)(struct pa_mainloop_api*a, void* id);
+    
+    /* Time sources */
+    void* (*source_time)(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata);
+    void  (*enable_time)(struct pa_mainloop_api*a, void *id, const struct timeval *tv);
+    void  (*cancel_time)(struct pa_mainloop_api*a, void* id);
+
+    /* Exit mainloop */
+    void (*quit)(struct pa_mainloop_api*a, int retval);
+};
+
+void pa_mainloop_api_once(struct pa_mainloop_api*m, void (*callback)(void *userdata), void *userdata);
+
+#endif
diff --git a/src/mainloop-signal.c b/src/mainloop-signal.c
new file mode 100644 (file)
index 0000000..dcc72f6
--- /dev/null
@@ -0,0 +1,138 @@
+#include <stdio.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "mainloop-signal.h"
+#include "util.h"
+
+struct signal_info {
+    int sig;
+    struct sigaction saved_sigaction;
+    void (*callback) (void *id, int signal, void *userdata);
+    void *userdata;
+    struct signal_info *previous, *next;
+};
+
+static struct pa_mainloop_api *api = NULL;
+static int signal_pipe[2] = { -1, -1 };
+static void* mainloop_source = NULL;
+static struct signal_info *signals = NULL;
+
+static void signal_handler(int sig) {
+    write(signal_pipe[1], &sig, sizeof(sig));
+}
+
+static void callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+    assert(a && id && events == PA_MAINLOOP_API_IO_EVENT_INPUT && id == mainloop_source && fd == signal_pipe[0]);
+
+    for (;;) {
+        ssize_t r;
+        int sig;
+        struct signal_info*s;
+        
+        if ((r = read(signal_pipe[0], &sig, sizeof(sig))) < 0) {
+            if (errno == EAGAIN)
+                return;
+
+            fprintf(stderr, "signal.c: read(): %s\n", strerror(errno));
+            return;
+        }
+
+        if (r != sizeof(sig)) {
+            fprintf(stderr, "signal.c: short read()\n");
+            return;
+        }
+
+        for (s = signals; s; s = s->next) 
+            if (s->sig == sig) {
+                assert(s->callback);
+                s->callback(s, sig, s->userdata);
+                break;
+            }
+    }
+}
+
+int pa_signal_init(struct pa_mainloop_api *a) {
+    assert(a);
+    if (pipe(signal_pipe) < 0) {
+        fprintf(stderr, "pipe() failed: %s\n", strerror(errno));
+        return -1;
+    }
+
+    make_nonblock_fd(signal_pipe[0]);
+    make_nonblock_fd(signal_pipe[1]);
+
+    api = a;
+    mainloop_source = api->source_io(api, signal_pipe[0], PA_MAINLOOP_API_IO_EVENT_INPUT, callback, NULL);
+    assert(mainloop_source);
+    return 0;
+}
+
+void pa_signal_done(void) {
+    assert(api && signal_pipe[0] >= 0 && signal_pipe[1] >= 0 && mainloop_source);
+
+    api->cancel_io(api, mainloop_source);
+    mainloop_source = NULL;
+
+    close(signal_pipe[0]);
+    close(signal_pipe[1]);
+    signal_pipe[0] = signal_pipe[1] = -1;
+
+    while (signals)
+        pa_signal_unregister(signals);
+    
+    api = NULL;
+}
+
+void* pa_signal_register(int sig, void (*callback) (void *id, int signal, void *userdata), void *userdata) {
+    struct signal_info *s = NULL;
+    struct sigaction sa;
+    assert(sig > 0 && callback);
+
+    for (s = signals; s; s = s->next)
+        if (s->sig == sig)
+            goto fail;
+    
+    s = malloc(sizeof(struct signal_info));
+    assert(s);
+    s->sig = sig;
+    s->callback = callback;
+    s->userdata = userdata;
+
+    memset(&sa, 0, sizeof(sa));
+    sa.sa_handler = signal_handler;
+    sigemptyset(&sa.sa_mask);
+    sa.sa_flags = SA_RESTART;
+    
+    if (sigaction(sig, &sa, &s->saved_sigaction) < 0)
+        goto fail;
+
+    s->previous = NULL;
+    s->next = signals;
+    signals = s;
+
+    return s;
+fail:
+    if (s)
+        free(s);
+    return NULL;
+}
+
+void pa_signal_unregister(void *id) {
+    struct signal_info *s = id;
+    assert(s);
+
+    if (s->next)
+        s->next->previous = s->previous;
+    if (s->previous)
+        s->previous->next = s->next;
+    else
+        signals = s->next;
+
+    sigaction(s->sig, &s->saved_sigaction, NULL);
+    free(s);
+}
diff --git a/src/mainloop-signal.h b/src/mainloop-signal.h
new file mode 100644 (file)
index 0000000..e3e2364
--- /dev/null
@@ -0,0 +1,12 @@
+#ifndef foomainloopsignalhfoo
+#define foomainloopsignalhfoo
+
+#include "mainloop-api.h"
+
+int pa_signal_init(struct pa_mainloop_api *api);
+void pa_signal_done(void);
+
+void* pa_signal_register(int signal, void (*callback) (void *id, int signal, void *userdata), void *userdata);
+void pa_signal_unregister(void *id);
+
+#endif
index fba0461c827043f859dc7e6f1ccb60bb8c7bef09..a1758c658992607c112e63a51369fa2e12446cf8 100644 (file)
@@ -1,3 +1,4 @@
+#include <stdio.h>
 #include <signal.h>
 #include <unistd.h>
 #include <sys/poll.h>
 #include <errno.h>
 
 #include "mainloop.h"
+#include "util.h"
+#include "idxset.h"
 
-struct mainloop_source {
-    struct mainloop_source *next;
-    struct mainloop *mainloop;
-    enum mainloop_source_type type;
-
-    int enabled;
+struct mainloop_source_header {
+    struct pa_mainloop *mainloop;
     int dead;
-    void *userdata;
-
-    struct {
-        int fd;
-        enum mainloop_io_event events;
-        void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata);
-        struct pollfd pollfd;
-    } io;
+};
     
-    struct  {
-        void (*callback)(struct mainloop_source*s, void *userdata);
-    } fixed;
+struct mainloop_source_io {
+    struct mainloop_source_header header;
     
-    struct  {
-        void (*callback)(struct mainloop_source*s, void *userdata);
-    } idle;
-
-    struct {
-        int sig;
-        struct sigaction sigaction;
-        void (*callback)(struct mainloop_source*s, int sig, void *userdata);
-    } signal;
+    int fd;
+    enum pa_mainloop_api_io_events events;
+    void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata);
+    void *userdata;
+
+    struct pollfd *pollfd;
 };
 
-struct mainloop_source_list {
-    struct mainloop_source *sources;
-    int n_sources;
-    int dead_sources;
+struct mainloop_source_fixed_or_idle {
+    struct mainloop_source_header header;
+    int enabled;
+
+    void (*callback)(struct pa_mainloop_api*a, void *id, void *userdata);
+    void *userdata;
 };
 
-struct mainloop {
-    struct mainloop_source_list io_sources, fixed_sources, idle_sources, signal_sources;
+struct mainloop_source_time {
+    struct mainloop_source_header header;
+    int enabled;
     
+    struct timeval timeval;
+    void (*callback)(struct pa_mainloop_api*a, void *id, const struct timeval*tv, void *userdata);
+    void *userdata;
+};
+
+struct pa_mainloop {
+    struct idxset *io_sources, *fixed_sources, *idle_sources, *time_sources;
+    int io_sources_scan_dead, fixed_sources_scan_dead, idle_sources_scan_dead, time_sources_scan_dead;
+
     struct pollfd *pollfds;
-    int max_pollfds, n_pollfds;
+    unsigned max_pollfds, n_pollfds;
     int rebuild_pollfds;
 
-    int quit;
-    int running;
-    int signal_pipe[2];
-    struct pollfd signal_pollfd;
+    int quit, running, retval;
+    struct pa_mainloop_api api;
 };
 
-static int signal_pipe = -1;
+static void setup_api(struct pa_mainloop *m);
 
-static void signal_func(int sig) {
-    if (signal_pipe >= 0)
-        write(signal_pipe, &sig, sizeof(sig));
-}
+struct pa_mainloop *pa_mainloop_new(void) {
+    struct pa_mainloop *m;
 
-static void make_nonblock(int fd) {
-    int v;
-    
-    if ((v = fcntl(fd, F_GETFL)) >= 0)
-        fcntl(fd, F_SETFL, v|O_NONBLOCK);
-}
+    m = malloc(sizeof(struct pa_mainloop));
+    assert(m);
 
+    m->io_sources = idxset_new(NULL, NULL);
+    m->fixed_sources = idxset_new(NULL, NULL);
+    m->idle_sources = idxset_new(NULL, NULL);
+    m->time_sources = idxset_new(NULL, NULL);
 
-struct mainloop *mainloop_new(void) {
-    int r;
-    struct mainloop *m;
+    assert(m->io_sources && m->fixed_sources && m->idle_sources && m->time_sources);
 
-    m = malloc(sizeof(struct mainloop));
-    assert(m);
-    memset(m, 0, sizeof(struct mainloop));
+    m->io_sources_scan_dead = m->fixed_sources_scan_dead = m->idle_sources_scan_dead = m->time_sources_scan_dead = 0;
+    
+    m->pollfds = NULL;
+    m->max_pollfds = m->n_pollfds = m->rebuild_pollfds = 0;
 
-    r = pipe(m->signal_pipe);
-    assert(r >= 0 && m->signal_pipe[0] >= 0 && m->signal_pipe[1] >= 0);
+    m->quit = m->running = m->retval = 0;
 
-    make_nonblock(m->signal_pipe[0]);
-    make_nonblock(m->signal_pipe[1]);
-    
-    signal_pipe = m->signal_pipe[1];
-    m->signal_pollfd.fd = m->signal_pipe[0];
-    m->signal_pollfd.events = POLLIN;
-    m->signal_pollfd.revents = 0;
+    setup_api(m);
     
     return m;
 }
 
-static void free_sources(struct mainloop_source_list *l, int all) {
-    struct mainloop_source *s, *p;
-    assert(l);
+static int foreach(void *p, uint32_t index, int *del, void*userdata) {
+    struct mainloop_source_header *h = p;
+    int *all = userdata;
+    assert(p && del && all);
 
-    if (!all && !l->dead_sources)
-        return;
-
-    p = NULL;
-    s = l->sources;
-    while (s) {
-        if (all || s->dead) {
-            struct mainloop_source *t = s;
-            s = s->next;
-
-            if (p)
-                p->next = s;
-            else
-                l->sources = s;
-            
-            free(t);
-        } else {
-            p = s;
-            s = s->next;
-        }
+    if (*all || h->dead) {
+        free(h);
+        *del = 1;
     }
 
-    l->dead_sources = 0;
-
-    if (all) {
-        assert(!l->sources);
-        l->n_sources = 0;
-    }
-}
+    return 0;
+};
 
-void mainloop_free(struct mainloop* m) {
+void pa_mainloop_free(struct pa_mainloop* m) {
+    int all = 1;
     assert(m);
-    free_sources(&m->io_sources, 1);
-    free_sources(&m->fixed_sources, 1);
-    free_sources(&m->idle_sources, 1);
-    free_sources(&m->signal_sources, 1);
-
-    if (signal_pipe == m->signal_pipe[1])
-        signal_pipe = -1;
-    close(m->signal_pipe[0]);
-    close(m->signal_pipe[1]);
-    
+    idxset_foreach(m->io_sources, foreach, &all);
+    idxset_foreach(m->fixed_sources, foreach, &all);
+    idxset_foreach(m->idle_sources, foreach, &all);
+    idxset_foreach(m->time_sources, foreach, &all);
+
+    idxset_free(m->io_sources, NULL, NULL);
+    idxset_free(m->fixed_sources, NULL, NULL);
+    idxset_free(m->idle_sources, NULL, NULL);
+    idxset_free(m->time_sources, NULL, NULL);
+
     free(m->pollfds);
     free(m);
 }
 
-static void rebuild_pollfds(struct mainloop *m) {
-    struct mainloop_source*s;
+static void scan_dead(struct pa_mainloop *m) {
+    int all = 0;
+    assert(m);
+    if (m->io_sources_scan_dead)
+        idxset_foreach(m->io_sources, foreach, &all);
+    if (m->fixed_sources_scan_dead)
+        idxset_foreach(m->fixed_sources, foreach, &all);
+    if (m->idle_sources_scan_dead)
+        idxset_foreach(m->idle_sources, foreach, &all);
+    if (m->time_sources_scan_dead)
+        idxset_foreach(m->time_sources, foreach, &all);
+}
+
+static void rebuild_pollfds(struct pa_mainloop *m) {
+    struct mainloop_source_io*s;
     struct pollfd *p;
-    
-    if (m->max_pollfds < m->io_sources.n_sources+1) {
-        m->max_pollfds = (m->io_sources.n_sources+1)*2;
-        m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds);
+    uint32_t index = IDXSET_INVALID;
+    unsigned l;
+
+    l = idxset_ncontents(m->io_sources);
+    if (m->max_pollfds < l) {
+        m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*l);
+        m->max_pollfds = l;
     }
 
     m->n_pollfds = 0;
     p = m->pollfds;
-    for (s = m->io_sources.sources; s; s = s->next) {
-        assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
-        if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) {
-            *(p++) = s->io.pollfd;
-            m->n_pollfds++;
+    for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
+        if (s->header.dead) {
+            s->pollfd = NULL;
+            continue;
         }
+
+        s->pollfd = p;
+        p->fd = s->fd;
+        p->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
+        p->revents = 0;
+
+        p++;
+        m->n_pollfds++;
     }
+}
+
+static void dispatch_pollfds(struct pa_mainloop *m) {
+    uint32_t index = IDXSET_INVALID;
+    struct mainloop_source_io *s;
 
-    *(p++) = m->signal_pollfd;
-    m->n_pollfds++;
+    for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
+        if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents)
+            continue;
+        
+        assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback);
+        s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata);
+    }
 }
 
-static void dispatch_pollfds(struct mainloop *m) {
-    int i;
-    struct pollfd *p;
-    struct mainloop_source *s;
-    /* This loop assumes that m->sources and m->pollfds have the same
-     * order and that m->pollfds is a subset of m->sources! */
+static void run_fixed_or_idle(struct pa_mainloop *m, struct idxset *i) {
+    uint32_t index = IDXSET_INVALID;
+    struct mainloop_source_fixed_or_idle *s;
 
-    s = m->io_sources.sources;
-    for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) {
-        if (!p->revents)
+    for (s = idxset_first(i, &index); s; s = idxset_next(i, &index)) {
+        if (s->header.dead || !s->enabled)
             continue;
 
-        if (p->fd == m->signal_pipe[0]) {
-            /* Event from signal pipe */
+        assert(s->callback);
+        s->callback(&m->api, s, s->userdata);
+    }
+}
+
+static int calc_next_timeout(struct pa_mainloop *m) {
+    uint32_t index = IDXSET_INVALID;
+    struct mainloop_source_time *s;
+    struct timeval now;
+    int t = -1;
 
-            if (p->revents & POLLIN) {
-                int sig;
-                ssize_t r;
-                r = read(m->signal_pipe[0], &sig, sizeof(sig));
-                assert((r < 0 && errno == EAGAIN) || r == sizeof(sig));
+    if (idxset_isempty(m->time_sources))
+        return -1;
+
+    gettimeofday(&now, NULL);
+    
+    for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
+        int tmp;
+        
+        if (s->header.dead || !s->enabled)
+            continue;
+
+        if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) 
+            return 0;
+
+        tmp = (s->timeval.tv_sec - now.tv_sec)*1000;
             
-                if (r == sizeof(sig)) {
-                    struct mainloop_source *l = m->signal_sources.sources;
-                    while (l) {
-                        assert(l->type == MAINLOOP_SOURCE_TYPE_SIGNAL);
-                        
-                        if (l->signal.sig == sig && l->enabled && !l->dead) {
-                            assert(l->signal.callback);
-                            l->signal.callback(l, sig, l->userdata);
-                        }
-                        
-                        l = l->next;
-                    }
-                }
-            }
-
-        } else {
-            /* Event from I/O source */
-
-            for (; s; s = s->next) {
-                if (p->fd != s->io.fd)
-                    continue;
-                
-                assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
-
-                if (!s->dead && s->enabled) {
-                    enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0);
-                    if (e) {
-                        assert(s->io.callback);
-                        s->io.callback(s, s->io.fd, e, s->userdata);
-                    }
-                }
-
-                break;
-            }
+        if (s->timeval.tv_usec > now.tv_usec)
+            tmp += (s->timeval.tv_usec - now.tv_usec)/1000;
+        else
+            tmp -= (now.tv_usec - s->timeval.tv_usec)/1000;
+
+        if (tmp == 0)
+            return 0;
+        else if (tmp < t)
+            t = tmp;
+    }
+
+    return t;
+}
+
+static void dispatch_timeout(struct pa_mainloop *m) {
+    uint32_t index = IDXSET_INVALID;
+    struct mainloop_source_time *s;
+    struct timeval now;
+    assert(m);
+
+    if (idxset_isempty(m->time_sources))
+        return;
+
+    gettimeofday(&now, NULL);
+    for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
+        
+        if (s->header.dead || !s->enabled)
+            continue;
+
+        if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) {
+            assert(s->callback);
+
+            s->enabled = 0;
+            s->callback(&m->api, s, &s->timeval, s->userdata);
         }
     }
 }
 
-int mainloop_iterate(struct mainloop *m, int block) {
-    struct mainloop_source *s;
-    int c;
+static int any_idle_sources(struct pa_mainloop *m) {
+    struct mainloop_source_fixed_or_idle *s;
+    uint32_t index;
+    assert(m);
+    
+    for (s = idxset_first(m->idle_sources, &index); s; s = idxset_next(m->idle_sources, &index))
+        if (!s->header.dead && s->enabled)
+            return 1;
+
+    return 0;
+}
+
+int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval) {
+    int r, idle;
     assert(m && !m->running);
     
-    if(m->quit)
-        return m->quit;
-
-    free_sources(&m->io_sources, 0);
-    free_sources(&m->fixed_sources, 0);
-    free_sources(&m->idle_sources, 0);
-    free_sources(&m->signal_sources, 0);
-
-    for (s = m->fixed_sources.sources; s; s = s->next) {
-        assert(s->type == MAINLOOP_SOURCE_TYPE_FIXED);
-        if (!s->dead && s->enabled) {
-            assert(s->fixed.callback);
-            s->fixed.callback(s, s->userdata);
-        }   
+    if(m->quit) {
+        if (retval)
+            *retval = m->retval;
+        return 1;
     }
 
+    m->running = 1;
+
+    scan_dead(m);
+    run_fixed_or_idle(m, m->fixed_sources);
+
     if (m->rebuild_pollfds) {
         rebuild_pollfds(m);
         m->rebuild_pollfds = 0;
     }
 
-    m->running = 1;
+    idle = any_idle_sources(m);
 
     do {
-        c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0);
-    } while (c < 0 && errno == EINTR);
-        
-    if (c > 0)
+        int t;
+
+        if (!block || idle)
+            t = 0;
+        else 
+            t = calc_next_timeout(m);
+            
+        r = poll(m->pollfds, m->n_pollfds, t);
+    } while (r < 0 && errno == EINTR);
+
+    dispatch_timeout(m);
+    
+    if (r > 0)
         dispatch_pollfds(m);
-    else if (c == 0) {
-        for (s = m->idle_sources.sources; s; s = s->next) {
-            assert(s->type == MAINLOOP_SOURCE_TYPE_IDLE);
-            if (!s->dead && s->enabled) {
-                assert(s->idle.callback);
-                s->idle.callback(s, s->userdata);
-            }
-        }
-    }
+    else if (r == 0 && idle)
+        run_fixed_or_idle(m, m->idle_sources);
+    else if (r < 0)
+        fprintf(stderr, "select(): %s\n", strerror(errno));
     
     m->running = 0;
-    return c < 0 ? -1 : 0;
+    return r < 0 ? -1 : 0;
 }
 
-int mainloop_run(struct mainloop *m) {
+int pa_mainloop_run(struct pa_mainloop *m, int *retval) {
     int r;
-    while (!(r = mainloop_iterate(m, 1)));
+    while ((r = pa_mainloop_iterate(m, 1, retval)) == 0);
     return r;
 }
 
-void mainloop_quit(struct mainloop *m, int r) {
+void pa_mainloop_quit(struct pa_mainloop *m, int r) {
     assert(m);
     m->quit = r;
 }
 
-static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) {
-    struct mainloop_source_list *l;
-    
-    switch(type) {
-        case MAINLOOP_SOURCE_TYPE_IO:
-            l = &m->io_sources;
-            break;
-        case MAINLOOP_SOURCE_TYPE_FIXED:
-            l = &m->fixed_sources;
-            break;
-        case MAINLOOP_SOURCE_TYPE_IDLE:
-            l = &m->idle_sources;
-            break;
-        case MAINLOOP_SOURCE_TYPE_SIGNAL:
-            l = &m->signal_sources;
-            break;
-        default:
-            l = NULL;
-            break;
-    }
-    
-    return l;
-}
-
-static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) {
-    struct mainloop_source_list *l;
-    struct mainloop_source* s;
-    assert(m);
+/* IO sources */
+static void* mainloop_source_io(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata) {
+    struct pa_mainloop *m;
+    struct mainloop_source_io *s;
+    assert(a && a->userdata && fd >= 0 && callback);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    s = malloc(sizeof(struct mainloop_source));
+    s = malloc(sizeof(struct mainloop_source_io));
     assert(s);
-    memset(s, 0, sizeof(struct mainloop_source));
+    s->header.mainloop = m;
+    s->header.dead = 0;
 
-    s->type = type;
-    s->mainloop = m;
+    s->fd = fd;
+    s->events = events;
+    s->callback = callback;
+    s->userdata = userdata;
+    s->pollfd = NULL;
 
-    l = get_source_list(m, type);
-    assert(l);
-            
-    s->next = l->sources;
-    l->sources = s;
-    l->n_sources++;
+    idxset_put(m->io_sources, s, NULL);
+    m->rebuild_pollfds = 1;
     return s;
 }
 
-struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) {
-    struct mainloop_source* s;
-    assert(m && fd>=0 && callback);
+static void mainloop_enable_io(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events) {
+    struct pa_mainloop *m;
+    struct mainloop_source_io *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api && s->header.mainloop == m);
 
-    s = source_new(m, MAINLOOP_SOURCE_TYPE_IO);
+    s->events = events;
+    if (s->pollfd)
+        s->pollfd->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
+}
 
-    s->io.fd = fd;
-    s->io.events = event;
-    s->io.callback = callback;
-    s->userdata = userdata;
-    s->io.pollfd.fd = fd;
-    s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0);
-    s->io.pollfd.revents = 0;
+static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) {
+    struct pa_mainloop *m;
+    struct mainloop_source_io *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api && s->header.mainloop == m);
 
-    s->enabled = 1;
-
-    m->rebuild_pollfds = 1;
-    return s;
+    s->header.dead = 1;
+    m->io_sources_scan_dead = 1;
 }
 
-struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
-    struct mainloop_source* s;
-    assert(m && callback);
+/* Fixed sources */
+static void* mainloop_source_fixed(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
+    struct pa_mainloop *m;
+    struct mainloop_source_fixed_or_idle *s;
+    assert(a && a->userdata && callback);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    s = source_new(m, MAINLOOP_SOURCE_TYPE_FIXED);
+    s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
+    assert(s);
+    s->header.mainloop = m;
+    s->header.dead = 0;
 
-    s->fixed.callback = callback;
-    s->userdata = userdata;
     s->enabled = 1;
+    s->callback = callback;
+    s->userdata = userdata;
+
+    idxset_put(m->fixed_sources, s, NULL);
     return s;
 }
 
-struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
-    struct mainloop_source* s;
-    assert(m && callback);
-
-    s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE);
+static void mainloop_enable_fixed(struct pa_mainloop_api*a, void* id, int b) {
+    struct pa_mainloop *m;
+    struct mainloop_source_fixed_or_idle *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    s->idle.callback = callback;
-    s->userdata = userdata;
-    s->enabled = 1;
-    return s;
+    s->enabled = b;
 }
 
-struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata) {
-    struct mainloop_source* s;
-    struct sigaction save_sa, sa;
-    
-    assert(m && callback);
+static void mainloop_cancel_fixed(struct pa_mainloop_api*a, void* id) {
+    struct pa_mainloop *m;
+    struct mainloop_source_fixed_or_idle *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    memset(&sa, 0, sizeof(sa));
-    sa.sa_handler = signal_func;
-    sa.sa_flags = SA_RESTART;
-    sigemptyset(&sa.sa_mask);
+    s->header.dead = 1;
+    m->fixed_sources_scan_dead = 1;
+}
 
-    memset(&save_sa, 0, sizeof(save_sa));
+/* Idle sources */
+static void* mainloop_source_idle(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
+    struct pa_mainloop *m;
+    struct mainloop_source_fixed_or_idle *s;
+    assert(a && a->userdata && callback);
+    m = a->userdata;
+    assert(a == &m->api);
+
+    s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
+    assert(s);
+    s->header.mainloop = m;
+    s->header.dead = 0;
 
-    if (sigaction(sig, &sa, &save_sa) < 0)
-        return NULL;
-    
-    s = source_new(m, MAINLOOP_SOURCE_TYPE_SIGNAL);
-    s->signal.sig = sig;
-    s->signal.sigaction = save_sa;
-    
-    s->signal.callback = callback;
-    s->userdata = userdata;
     s->enabled = 1;
+    s->callback = callback;
+    s->userdata = userdata;
+
+    idxset_put(m->idle_sources, s, NULL);
     return s;
 }
 
-void mainloop_source_free(struct mainloop_source*s) {
-    struct mainloop_source_list *l;
-    assert(s && !s->dead);
-    s->dead = 1;
-
-    assert(s->mainloop);
-    l = get_source_list(s->mainloop, s->type);
-    assert(l);
+static void mainloop_cancel_idle(struct pa_mainloop_api*a, void* id) {
+    struct pa_mainloop *m;
+    struct mainloop_source_fixed_or_idle *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    l->n_sources--;
-    l->dead_sources = 1;
-
-    if (s->type == MAINLOOP_SOURCE_TYPE_IO)
-        s->mainloop->rebuild_pollfds = 1;
-    else if (s->type == MAINLOOP_SOURCE_TYPE_SIGNAL)
-        sigaction(s->signal.sig, &s->signal.sigaction, NULL);
+    s->header.dead = 1;
+    m->idle_sources_scan_dead = 1;
 }
 
-void mainloop_source_enable(struct mainloop_source*s, int b) {
-    assert(s && !s->dead);
+/* Time sources */
+static void* mainloop_source_time(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata) {
+    struct pa_mainloop *m;
+    struct mainloop_source_time *s;
+    assert(a && a->userdata && callback);
+    m = a->userdata;
+    assert(a == &m->api);
 
-    if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) {
-        assert(s->mainloop);
-        s->mainloop->rebuild_pollfds = 1;
-    }
+    s = malloc(sizeof(struct mainloop_source_time));
+    assert(s);
+    s->header.mainloop = m;
+    s->header.dead = 0;
 
-    s->enabled = b;
-}
+    s->enabled = !!tv;
+    if (tv)
+        s->timeval = *tv;
 
-void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) {
-    assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO);
+    s->callback = callback;
+    s->userdata = userdata;
 
-    if (s->io.events != events) {
-        assert(s->mainloop);
-        s->mainloop->rebuild_pollfds = 1;
-    }
+    idxset_put(m->time_sources, s, NULL);
+    return s;
+}
 
-    s->io.events = events;
-    s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0);
+static void mainloop_enable_time(struct pa_mainloop_api*a, void *id, const struct timeval *tv) {
+    struct pa_mainloop *m;
+    struct mainloop_source_time *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api);
+
+    if (tv) {
+        s->enabled = 1;
+        s->timeval = *tv;
+    } else
+        s->enabled = 0;
 }
 
-struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) {
-    assert(s);
+static void mainloop_cancel_time(struct pa_mainloop_api*a, void* id) {
+    struct pa_mainloop *m;
+    struct mainloop_source_time *s = id;
+    assert(a && a->userdata && s && !s->header.dead);
+    m = a->userdata;
+    assert(a == &m->api);
+
+    s->header.dead = 1;
+    m->time_sources_scan_dead = 1;
 
-    return s->mainloop;
 }
 
-struct once_info {
-    void (*callback)(void *userdata);
-    void *userdata;
-};
+static void mainloop_quit(struct pa_mainloop_api*a, int retval) {
+    struct pa_mainloop *m;
+    assert(a && a->userdata);
+    m = a->userdata;
+    assert(a == &m->api);
 
-static void once_callback(struct mainloop_source *s, void *userdata) {
-    struct once_info *i = userdata;
-    assert(s && i && i->callback);
-    i->callback(i->userdata);
-    mainloop_source_free(s);
-    free(i);
+    m->quit = 1;
+    m->retval = retval;
 }
+    
+static void setup_api(struct pa_mainloop *m) {
+    assert(m);
+    
+    m->api.userdata = m;
+    m->api.source_io = mainloop_source_io;
+    m->api.enable_io = mainloop_enable_io;
+    m->api.cancel_io = mainloop_cancel_io;
+
+    m->api.source_fixed = mainloop_source_fixed;
+    m->api.enable_fixed = mainloop_enable_fixed;
+    m->api.cancel_fixed = mainloop_cancel_fixed;
+
+    m->api.source_idle = mainloop_source_idle;
+    m->api.enable_idle = mainloop_enable_fixed; /* (!) */
+    m->api.cancel_idle = mainloop_cancel_idle;
+    
+    m->api.source_time = mainloop_source_time;
+    m->api.enable_time = mainloop_enable_time;
+    m->api.cancel_time = mainloop_cancel_time;
 
-void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata) {
-    struct once_info *i;
-    assert(m && callback);
+    m->api.quit = mainloop_quit;
+}
 
-    i = malloc(sizeof(struct once_info));
-    assert(i);
-    i->callback = callback;
-    i->userdata = userdata;
-    
-    mainloop_source_new_fixed(m, once_callback, i);
+struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m) {
+    assert(m);
+    return &m->api;
 }
+        
index c1bfc62af21a959fcf150cd3b9eba03084924ade..7837636f94d02bf63971393739bae1a1e2d388ac 100644 (file)
@@ -1,42 +1,16 @@
 #ifndef foomainloophfoo
 #define foomainloophfoo
 
-struct mainloop;
-struct mainloop_source;
+#include "mainloop-api.h"
 
-enum mainloop_io_event {
-    MAINLOOP_IO_EVENT_NULL = 0,
-    MAINLOOP_IO_EVENT_IN = 1,
-    MAINLOOP_IO_EVENT_OUT = 2,
-    MAINLOOP_IO_EVENT_BOTH = 3
-};
+struct pa_mainloop;
 
-enum mainloop_source_type {
-    MAINLOOP_SOURCE_TYPE_IO,
-    MAINLOOP_SOURCE_TYPE_FIXED, 
-    MAINLOOP_SOURCE_TYPE_IDLE,
-    MAINLOOP_SOURCE_TYPE_SIGNAL
-};
+struct pa_mainloop *pa_mainloop_new(void);
+void pa_mainloop_free(struct pa_mainloop* m);
 
-struct mainloop *mainloop_new(void);
-void mainloop_free(struct mainloop* m);
+int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval);
+int pa_mainloop_run(struct pa_mainloop *m, int *retval);
 
-int mainloop_iterate(struct mainloop *m, int block);
-int mainloop_run(struct mainloop *m);
-void mainloop_quit(struct mainloop *m, int r);
-
-struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata);
-struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
-struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
-struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata);
-
-void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata);
-
-void mainloop_source_free(struct mainloop_source*s);
-void mainloop_source_enable(struct mainloop_source*s, int b);
-
-void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event);
-
-struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s);
+struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m);
 
 #endif
index 10c59e5074ba4d604c7033d24362b8d4907400af..9a601c3ae8ae2f32e93dfd074591af9db204f0d2 100644 (file)
@@ -220,3 +220,12 @@ uint32_t memblockq_get_length(struct memblockq *bq) {
     assert(bq);
     return bq->total_length;
 }
+
+uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen) {
+    assert(bq && qlen);
+
+    if (bq->total_length >= qlen)
+        return 0;
+
+    return qlen - bq->total_length;
+}
index 0a68ddaff4453f94c3261085afbe5dbcfa1ce6ee..a681ff080d8fcb3d0682b153a6d383962e2cbdd0 100644 (file)
@@ -25,4 +25,6 @@ int memblockq_is_writable(struct memblockq *bq, size_t length);
 uint32_t memblockq_get_delay(struct memblockq *bq);
 uint32_t memblockq_get_length(struct memblockq *bq);
 
+uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen);
+
 #endif
index bfc3a6fa13b95742f802af1861f3314f5fc112af..b70ea6bdac5bd05713cf780b934655142bd771ae 100644 (file)
 #include "source.h"
 #include "module.h"
 #include "oss.h"
+#include "sample-util.h"
 
 struct userdata {
     struct sink *sink;
     struct source *source;
     struct core *core;
-    struct sample_spec sample_spec;
+    struct pa_sample_spec sample_spec;
 
-    size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, sample_size, out_fill;
-    uint32_t sample_usec;
+    size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, out_fill;
 
     int fd;
 
@@ -161,12 +161,14 @@ static void do_read(struct userdata *u) {
     in_clear_memblocks(u, u->in_fragments/2);
 };
 
-static void io_callback(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata) {
+static void io_callback(struct pa_mainloop_api *m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
     struct userdata *u = userdata;
 
-    if (event & MAINLOOP_IO_EVENT_IN)
+    assert (u && u->core->mainloop == m && u->mainloop_source == id);
+
+    if (events & PA_MAINLOOP_API_IO_EVENT_INPUT)
         do_read(u);
-    if (event & MAINLOOP_IO_EVENT_OUT)
+    if (events & PA_MAINLOOP_API_IO_EVENT_OUTPUT)
         do_write(u);
 }
 
@@ -175,7 +177,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
     assert(s && u);
 
     do_write(u);
-    return u->out_fill/u->sample_size*u->sample_usec;
+    return pa_samples_usec(u->out_fill, &s->sample_spec);
 }
 
 int module_init(struct core *c, struct module*m) {
@@ -316,10 +318,7 @@ int module_init(struct core *c, struct module*m) {
         
     assert(u->source || u->sink);
 
-    u->sample_size = sample_size(&u->sample_spec);
-    u->sample_usec = 1000000/u->sample_spec.rate;
-
-    u->mainloop_source = mainloop_source_new_io(c->mainloop, u->fd, (u->source ? MAINLOOP_IO_EVENT_IN : 0) | (u->sink ? MAINLOOP_IO_EVENT_OUT : 0), io_callback, u);
+    u->mainloop_source = c->mainloop->source_io(c->mainloop, u->fd, (u->source ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | (u->sink ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), io_callback, u);
     assert(u->mainloop_source);
 
     return 0;
@@ -360,7 +359,7 @@ void module_done(struct core *c, struct module*m) {
         source_free(u->source);
 
     if (u->mainloop_source)
-        mainloop_source_free(u->mainloop_source);
+        u->core->mainloop->cancel_io(u->core->mainloop, u->mainloop_source);
 
     if (u->fd >= 0)
         close(u->fd);
index fab5c8c557103eabc4d950c5a85d53934e428189..4f3075453f916df989d21aa8c6c81c363068ea82 100644 (file)
@@ -15,7 +15,7 @@
 #include "source.h"
 #include "module.h"
 #include "oss.h"
-#include "sample.h"
+#include "sample-util.h"
 
 struct userdata {
     struct sink *sink;
@@ -81,7 +81,7 @@ static void do_read(struct userdata *u) {
         return;
     }
 
-    assert(r <= memchunk.memblock->length);
+    assert(r <= (ssize_t) memchunk.memblock->length);
     memchunk.length = memchunk.memblock->length = r;
     memchunk.index = 0;
 
@@ -107,7 +107,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
         return 0;
     }
 
-    return samples_usec(arg, &s->sample_spec);
+    return pa_samples_usec(arg, &s->sample_spec);
 }
 
 int module_init(struct core *c, struct module*m) {
@@ -117,7 +117,7 @@ int module_init(struct core *c, struct module*m) {
     int fd = -1;
     int frag_size, in_frag_size, out_frag_size;
     int mode;
-    struct sample_spec ss;
+    struct pa_sample_spec ss;
     assert(c && m);
 
     p = m->argument ? m->argument : "/dev/dsp";
@@ -203,7 +203,7 @@ int module_init(struct core *c, struct module*m) {
 
     u->memchunk.memblock = NULL;
     u->memchunk.length = 0;
-    u->sample_size = sample_size(&ss);
+    u->sample_size = pa_sample_size(&ss);
 
     u->out_fragment_size = out_frag_size;
     u->in_fragment_size = in_frag_size;
index c0a903e9493a3794fac69423ac337203c9121e5a..9dcf5d2316ebe716ddcf7ced8de072df1252848a 100644 (file)
@@ -18,7 +18,8 @@ struct userdata {
     struct sink *sink;
     struct iochannel *io;
     struct core *core;
-    struct mainloop_source *mainloop_source;
+    void *mainloop_source;
+    struct pa_mainloop_api *mainloop;
 
     struct memchunk memchunk;
 };
@@ -27,7 +28,7 @@ static void do_write(struct userdata *u) {
     ssize_t r;
     assert(u);
 
-    mainloop_source_enable(u->mainloop_source, 0);
+    u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
         
     if (!iochannel_is_writable(u->io))
         return;
@@ -57,10 +58,10 @@ static void notify_cb(struct sink*s) {
     assert(s && u);
 
     if (iochannel_is_writable(u->io))
-        mainloop_source_enable(u->mainloop_source, 1);
+        u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 1);
 }
 
-static void prepare_callback(struct mainloop_source *src, void *userdata) {
+static void fixed_callback(struct pa_mainloop_api *m, void *id, void *userdata) {
     struct userdata *u = userdata;
     assert(u);
     do_write(u);
@@ -77,7 +78,7 @@ int module_init(struct core *c, struct module*m) {
     struct stat st;
     char *p;
     int fd = -1;
-    static const struct sample_spec ss = {
+    static const struct pa_sample_spec ss = {
         .format = SAMPLE_S16NE,
         .rate = 44100,
         .channels = 2,
@@ -120,10 +121,11 @@ int module_init(struct core *c, struct module*m) {
     u->memchunk.memblock = NULL;
     u->memchunk.length = 0;
 
-    u->mainloop_source = mainloop_source_new_fixed(c->mainloop, prepare_callback, u);
+    u->mainloop = c->mainloop;
+    u->mainloop_source = u->mainloop->source_fixed(u->mainloop, fixed_callback, u);
     assert(u->mainloop_source);
-    mainloop_source_enable(u->mainloop_source, 0);
-    
+    u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
+        
     m->userdata = u;
 
     return 0;
@@ -147,7 +149,7 @@ void module_done(struct core *c, struct module*m) {
         
     sink_free(u->sink);
     iochannel_free(u->io);
-    mainloop_source_free(u->mainloop_source);
+    u->mainloop->cancel_fixed(u->mainloop, u->mainloop_source);
 
     assert(u->filename);
     unlink(u->filename);
index c6de1751ab8f36479148127560b4a414c2b8d04d..883a22df9b8dd73a552a18c16e8a8db983f6314d 100644 (file)
@@ -151,5 +151,5 @@ void module_unload_request(struct core *c, struct module *m) {
     assert(i);
     i->core = c;
     i->index = m->index;
-    mainloop_once(c->mainloop, module_unload_once_callback, i);
+    pa_mainloop_api_once(c->mainloop, module_unload_once_callback, i);
 }
index 7b1315c0065c8548af081e05cd12f838469b4a31..02bf8cd1c8382af3d86243519bc9b3afa78630f8 100644 (file)
--- a/src/oss.c
+++ b/src/oss.c
@@ -7,7 +7,7 @@
 
 #include "oss.h"
 
-int oss_auto_format(int fd, struct sample_spec *ss) {
+int oss_auto_format(int fd, struct pa_sample_spec *ss) {
     int format, channels, speed;
 
     assert(fd >= 0 && ss);
index 35d2dd023553762938649a3b17e01656beb102b0..34ac9c669586875edef9556051e2ecde21c049ad 100644 (file)
--- a/src/oss.h
+++ b/src/oss.h
@@ -3,6 +3,6 @@
 
 #include "sample.h"
 
-int oss_auto_format(int fd, struct sample_spec *ss);
+int oss_auto_format(int fd, struct pa_sample_spec *ss);
 
 #endif
diff --git a/src/pacat.c b/src/pacat.c
new file mode 100644 (file)
index 0000000..5f5a373
--- /dev/null
@@ -0,0 +1,169 @@
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "polyp.h"
+#include "mainloop.h"
+
+static struct pa_context *context = NULL;
+static struct pa_stream *stream = NULL;
+static struct pa_mainloop_api *mainloop_api = NULL;
+
+static void *buffer = NULL;
+static size_t buffer_length = 0, buffer_index = 0;
+
+static void* stdin_source = NULL;
+
+static void context_die_callback(struct pa_context *c, void *userdata) {
+    assert(c);
+    fprintf(stderr, "Connection to server shut down, exiting.\n");
+    mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stream_die_callback(struct pa_stream *s, void *userdata) {
+    assert(s);
+    fprintf(stderr, "Stream deleted, exiting.\n");
+    mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
+    size_t l;
+    assert(s && length);
+    
+    mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK);
+
+    if (!buffer)
+        return;
+    
+    assert(buffer_length);
+    
+    l = length;
+    if (l > buffer_length)
+        l = buffer_length;
+    
+    pa_stream_write(s, buffer+buffer_index, l);
+    buffer_length -= l;
+    buffer_index += l;
+    
+    if (!buffer_length) {
+        free(buffer);
+        buffer = NULL;
+        buffer_index = buffer_length = 0;
+    }
+}
+
+static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) {
+    assert(c);
+
+    if (!s) {
+        fprintf(stderr, "Stream creation failed.\n");
+        mainloop_api->quit(mainloop_api, 1);
+    }
+
+    stream = s;
+    pa_stream_set_die_callback(stream, stream_die_callback, NULL);
+    pa_stream_set_write_callback(stream, stream_write_callback, NULL);
+}
+
+static void context_complete_callback(struct pa_context *c, int success, void *userdata) {
+    static const struct pa_sample_spec ss = {
+        .format = SAMPLE_S16NE,
+        .rate = 44100,
+        .channels = 2
+    };
+        
+    assert(c && !stream);
+
+    if (!success) {
+        fprintf(stderr, "Connection failed\n");
+        goto fail;
+    }
+    
+    if (pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL) < 0) {
+        fprintf(stderr, "pa_stream_new() failed.\n");
+        goto fail;
+    }
+
+    return;
+    
+fail:
+    mainloop_api->quit(mainloop_api, 1);
+}
+
+static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+    size_t l;
+    ssize_t r;
+    assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
+
+    if (buffer) {
+        mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL);
+        return;
+    }
+
+    if (!(l = pa_stream_writable_size(stream)))
+        l = 4096;
+    buffer = malloc(l);
+    assert(buffer);
+    if ((r = read(fd, buffer, l)) <= 0) {
+        if (r == 0)
+            mainloop_api->quit(mainloop_api, 0);
+        else {
+            fprintf(stderr, "read() failed: %s\n", strerror(errno));
+            mainloop_api->quit(mainloop_api, 1);
+        }
+
+        return;
+    }
+
+    buffer_length = r;
+    buffer_index = 0;
+}
+
+int main(int argc, char *argv[]) {
+    struct pa_mainloop* m;
+    int ret = 1;
+
+    if (!(m = pa_mainloop_new())) {
+        fprintf(stderr, "pa_mainloop_new() failed.\n");
+        goto quit;
+    }
+
+    mainloop_api = pa_mainloop_get_api(m);
+
+    if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) {
+        fprintf(stderr, "source_io() failed.\n");
+        goto quit;
+    }
+    
+    if (!(context = pa_context_new(mainloop_api, argv[0]))) {
+        fprintf(stderr, "pa_context_new() failed.\n");
+        goto quit;
+    }
+
+    if (pa_context_connect(context, NULL, context_complete_callback, NULL) < 0) {
+        fprintf(stderr, "pa_context_connext() failed.\n");
+        goto quit;
+    }
+        
+    pa_context_set_die_callback(context, context_die_callback, NULL);
+
+    if (pa_mainloop_run(m, &ret) < 0) {
+        fprintf(stderr, "pa_mainloop_run() failed.\n");
+        goto quit;
+    }
+    
+quit:
+    if (stream)
+        pa_stream_free(stream);
+    if (context)
+        pa_context_free(context);
+    if (m)
+        pa_mainloop_free(m);
+    if (buffer)
+        free(buffer);
+    
+    return ret;
+}
index 47fce9194770111d5e521deacf7bb6b610cb4a3f..0f966d9a749b3ecc739d0669cd1dce6e71edeb8e 100644 (file)
@@ -16,7 +16,7 @@ struct packet* packet_new(size_t length) {
     return p;
 }
 
-struct packet* packet_dynamic(uint8_t* data, size_t length) {
+struct packet* packet_new_dynamic(uint8_t* data, size_t length) {
     struct packet *p;
     assert(data && length);
     p = malloc(sizeof(struct packet));
@@ -26,6 +26,7 @@ struct packet* packet_dynamic(uint8_t* data, size_t length) {
     p->length = length;
     p->data = data;
     p->type = PACKET_DYNAMIC;
+    return p;
 }
 
 struct packet* packet_ref(struct packet *p) {
diff --git a/src/pdispatch.c b/src/pdispatch.c
new file mode 100644 (file)
index 0000000..48b6639
--- /dev/null
@@ -0,0 +1,149 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include "pdispatch.h"
+#include "protocol-native-spec.h"
+
+struct reply_info {
+    struct pdispatch *pdispatch;
+    struct reply_info *next, *previous;
+    int (*callback)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+    void *userdata;
+    uint32_t tag;
+    void *mainloop_timeout;
+};
+
+struct pdispatch {
+    struct pa_mainloop_api *mainloop;
+    const struct pdispatch_command *command_table;
+    unsigned n_commands;
+    struct reply_info *replies;
+};
+
+static void reply_info_free(struct reply_info *r) {
+    assert(r && r->pdispatch && r->pdispatch->mainloop);
+    r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout);
+
+    if (r->previous)
+        r->previous->next = r->next;
+    else
+        r->pdispatch->replies = r->next;
+
+    if (r->next)
+        r->next->previous = r->previous;
+    
+    free(r);
+}
+
+struct pdispatch* pdispatch_new(struct pa_mainloop_api *mainloop, const struct pdispatch_command*table, unsigned entries) {
+    struct pdispatch *pd;
+    assert(mainloop);
+
+    assert((entries && table) || (!entries && !table));
+    
+    pd = malloc(sizeof(struct pdispatch));
+    assert(pd);
+    pd->mainloop = mainloop;
+    pd->command_table = table;
+    pd->n_commands = entries;
+    return pd;
+}
+
+void pdispatch_free(struct pdispatch *pd) {
+    assert(pd);
+    while (pd->replies)
+        reply_info_free(pd->replies);
+    free(pd);
+}
+
+int pdispatch_run(struct pdispatch *pd, struct packet*packet, void *userdata) {
+    uint32_t tag, command;
+    assert(pd && packet);
+    struct tagstruct *ts = NULL;
+    assert(pd && packet && packet->data);
+
+    if (packet->length <= 8)
+        goto fail;
+
+    ts = tagstruct_new(packet->data, packet->length);
+    assert(ts);
+    
+    if (tagstruct_getu32(ts, &command) < 0 ||
+        tagstruct_getu32(ts, &tag) < 0)
+        goto fail;
+
+    if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
+        struct reply_info *r;
+        int done = 0;
+
+        for (r = pd->replies; r; r = r->next) {
+            if (r->tag == tag) {
+                int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata);
+                reply_info_free(r);
+                
+                if (ret < 0)
+                    goto fail;
+                
+                done = 1;
+                break;
+            }
+        }
+
+        if (!done)
+            goto fail;
+
+    } else if (pd->command_table && command < pd->n_commands) {
+        const struct pdispatch_command *c = pd->command_table+command;
+
+        if (!c->proc)
+            goto fail;
+        
+        if (c->proc(pd, command, tag, ts, userdata) < 0)
+            goto fail;
+    } else
+        goto fail;
+    
+    tagstruct_free(ts);    
+        
+    return 0;
+
+fail:
+    if (ts)
+        tagstruct_free(ts);    
+
+    fprintf(stderr, "protocol-native: invalid packet.\n");
+    return -1;
+}
+
+static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) {
+    struct reply_info*r = userdata;
+    assert (r && r->mainloop_timeout == id && r->pdispatch && r->pdispatch->mainloop == m && r->callback);
+
+    r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata);
+    reply_info_free(r);
+}
+
+void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata) {
+    struct reply_info *r;
+    struct timeval tv;
+    assert(pd && cb);
+
+    r = malloc(sizeof(struct reply_info));
+    assert(r);
+    r->pdispatch = pd;
+    r->callback = cb;
+    r->userdata = userdata;
+    r->tag = tag;
+
+    gettimeofday(&tv, NULL);
+    tv.tv_sec += timeout;
+
+    r->mainloop_timeout = pd->mainloop->source_time(pd->mainloop, &tv, timeout_callback, r);
+    assert(r->mainloop_timeout);
+
+    r->previous = NULL;
+    r->next = pd->replies;
+    if (r->next)
+        r->next->previous = r;
+    pd->replies = r;
+}
diff --git a/src/pdispatch.h b/src/pdispatch.h
new file mode 100644 (file)
index 0000000..466da9a
--- /dev/null
@@ -0,0 +1,22 @@
+#ifndef foopdispatchhfoo
+#define foopdispatchhfoo
+
+#include <inttypes.h>
+#include "tagstruct.h"
+#include "packet.h"
+#include "mainloop-api.h"
+
+struct pdispatch;
+
+struct pdispatch_command {
+    int (*proc)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+};
+
+struct pdispatch* pdispatch_new(struct pa_mainloop_api *m, const struct pdispatch_command*table, unsigned entries);
+void pdispatch_free(struct pdispatch *pd);
+
+int pdispatch_run(struct pdispatch *pd, struct packet*p, void *userdata);
+
+void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata);
+
+#endif
diff --git a/src/polyp.c b/src/polyp.c
new file mode 100644 (file)
index 0000000..fdff7f9
--- /dev/null
@@ -0,0 +1,451 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "polyp.h"
+#include "protocol-native-spec.h"
+#include "pdispatch.h"
+#include "pstream.h"
+#include "dynarray.h"
+#include "socket-client.h"
+#include "pstream-util.h"
+
+#define DEFAULT_QUEUE_LENGTH 10240
+#define DEFAULT_MAX_LENGTH 20480
+#define DEFAULT_PREBUF 4096
+#define DEFAULT_TIMEOUT 5
+
+struct pa_context {
+    char *name;
+    struct pa_mainloop_api* mainloop;
+    struct socket_client *client;
+    struct pstream *pstream;
+    struct pdispatch *pdispatch;
+    struct dynarray *streams;
+    struct pa_stream *first_stream;
+    uint32_t ctag;
+    uint32_t errno;
+    enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state;
+
+    void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
+    void *connect_complete_userdata;
+
+    void (*die_callback)(struct pa_context*c, void *userdata);
+    void *die_userdata;
+};
+
+struct pa_stream {
+    struct pa_context *context;
+    struct pa_stream *next, *previous;
+    uint32_t channel;
+    int channel_valid;
+    enum pa_stream_direction direction;
+    enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
+    uint32_t requested_bytes;
+
+    void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
+    void *read_userdata;
+
+    void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
+    void *write_userdata;
+    
+    void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata);
+    void *create_complete_userdata;
+    
+    void (*die_callback)(struct pa_stream*c, void *userdata);
+    void *die_userdata;
+};
+
+static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+
+static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
+    [PA_COMMAND_ERROR] = { NULL },
+    [PA_COMMAND_REPLY] = { NULL },
+    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
+    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
+    [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
+    [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
+    [PA_COMMAND_EXIT] = { NULL },
+    [PA_COMMAND_REQUEST] = { command_request },
+};
+
+struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
+    assert(mainloop && name);
+    struct pa_context *c;
+    c = malloc(sizeof(struct pa_context));
+    assert(c);
+    c->name = strdup(name);
+    c->mainloop = mainloop;
+    c->client = NULL;
+    c->pstream = NULL;
+    c->pdispatch = NULL;
+    c->streams = dynarray_new();
+    assert(c->streams);
+    c->first_stream = NULL;
+    c->errno = PA_ERROR_OK;
+    c->state = CONTEXT_UNCONNECTED;
+    c->ctag = 0;
+
+    c->connect_complete_callback = NULL;
+    c->connect_complete_userdata = NULL;
+
+    c->die_callback = NULL;
+    c->die_userdata = NULL;
+    
+    return c;
+}
+
+void pa_context_free(struct pa_context *c) {
+    assert(c);
+
+    while (c->first_stream)
+        pa_stream_free(c->first_stream);
+    
+    if (c->client)
+        socket_client_free(c->client);
+    if (c->pdispatch)
+        pdispatch_free(c->pdispatch);
+    if (c->pstream)
+        pstream_free(c->pstream);
+    if (c->streams)
+        dynarray_free(c->streams, NULL, NULL);
+        
+    free(c->name);
+    free(c);
+}
+
+static void stream_dead(struct pa_stream *s) {
+    if (s->state == STREAM_DEAD)
+        return;
+
+    s->state = STREAM_DEAD;
+    if (s->die_callback)
+        s->die_callback(s, s->die_userdata);
+}
+
+static void context_dead(struct pa_context *c) {
+    struct pa_stream *s;
+    assert(c);
+    
+    for (s = c->first_stream; s; s = s->next)
+        stream_dead(s);
+
+    if (c->state == CONTEXT_DEAD)
+        return;
+    
+    c->state = CONTEXT_DEAD;
+    if (c->die_callback)
+        c->die_callback(c, c->die_userdata);
+}
+
+static void pstream_die_callback(struct pstream *p, void *userdata) {
+    struct pa_context *c = userdata;
+    assert(p && c);
+
+    assert(c->state != CONTEXT_DEAD);
+    
+    c->state = CONTEXT_DEAD;
+
+    context_dead(c);
+}
+
+static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
+    struct pa_context *c = userdata;
+    assert(p && packet && c);
+
+    if (pdispatch_run(c->pdispatch, packet, c) < 0) {
+        fprintf(stderr, "polyp.c: invalid packet.\n");
+        return -1;
+    }
+
+    return 0;
+}
+
+static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
+    struct pa_context *c = userdata;
+    struct pa_stream *s;
+    assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
+
+    if (!(s = dynarray_get(c->streams, channel)))
+        return -1;
+
+    if (s->read_callback)
+        s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
+    
+    return 0;
+}
+
+static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) {
+    struct pa_context *c = userdata;
+    assert(client && io && c && c->state == CONTEXT_CONNECTING);
+
+    socket_client_free(client);
+    c->client = NULL;
+
+    if (!io) {
+        c->errno = PA_ERROR_CONNECTIONREFUSED;
+        c->state = CONTEXT_UNCONNECTED;
+
+        if (c->connect_complete_callback)
+            c->connect_complete_callback(c, 0, c->connect_complete_userdata);
+
+        return;
+    }
+    
+    c->pstream = pstream_new(c->mainloop, io);
+    assert(c->pstream);
+    pstream_set_die_callback(c->pstream, pstream_die_callback, c);
+    pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
+    pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
+    
+    c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
+    assert(c->pdispatch);
+
+    c->state = CONTEXT_READY;
+
+    if (c->connect_complete_callback)
+        c->connect_complete_callback(c, 1, c->connect_complete_userdata);
+}
+
+int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
+    assert(c && c->state == CONTEXT_UNCONNECTED);
+
+    assert(!c->client);
+    if (!(c->client = socket_client_new_unix(c->mainloop, server))) {
+        c->errno = PA_ERROR_CONNECTIONREFUSED;
+        return -1;
+    }
+
+    c->connect_complete_callback = complete;
+    c->connect_complete_userdata = userdata;
+    
+    socket_client_set_callback(c->client, on_connection, c);
+    c->state = CONTEXT_CONNECTING;
+
+    return 0;
+}
+
+int pa_context_is_dead(struct pa_context *c) {
+    assert(c);
+    return c->state == CONTEXT_DEAD;
+}
+
+int pa_context_is_ready(struct pa_context *c) {
+    assert(c);
+    return c->state == CONTEXT_READY;
+}
+
+int pa_context_errno(struct pa_context *c) {
+    assert(c);
+    return c->errno;
+}
+
+void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
+    assert(c);
+    c->die_callback = cb;
+    c->die_userdata = userdata;
+}
+
+static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+    struct pa_stream *s;
+    struct pa_context *c = userdata;
+    uint32_t bytes, channel;
+    assert(pd && command == PA_COMMAND_REQUEST && t &&  s);
+
+    if (tagstruct_getu32(t, &channel) < 0 ||
+        tagstruct_getu32(t, &bytes) < 0 ||
+        tagstruct_eof(t)) {
+        c->errno = PA_ERROR_PROTOCOL;
+        return -1;
+    }
+    
+    if (!(s = dynarray_get(c->streams, channel))) {
+        c->errno = PA_ERROR_PROTOCOL;
+        return -1;
+    }
+
+    s->requested_bytes += bytes;
+    
+    if (s->requested_bytes && s->write_callback)
+        s->write_callback(s, s->requested_bytes, s->write_userdata);
+
+    return 0;
+}
+
+static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+    int ret = 0;
+    struct pa_stream *s = userdata;
+    assert(pd && s && s->state == STREAM_CREATING);
+
+    if (command != PA_COMMAND_REPLY) {
+        struct pa_context *c = s->context;
+        assert(c);
+
+        if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) {
+            s->context->errno = PA_ERROR_PROTOCOL;
+            ret = -1;
+        } else if (command == PA_COMMAND_TIMEOUT) {
+            s->context->errno = PA_ERROR_TIMEOUT;
+            ret = -1;
+        }
+
+        goto fail;
+    }
+
+    if (tagstruct_getu32(t, &s->channel) < 0 ||
+        tagstruct_eof(t)) {
+        s->context->errno = PA_ERROR_PROTOCOL;
+        ret = -1;
+        goto fail;
+    }
+
+    s->channel_valid = 1;
+    dynarray_put(s->context->streams, s->channel, s);
+    
+    s->state = STREAM_READY;
+    assert(s->create_complete_callback);
+    s->create_complete_callback(s->context, s, s->create_complete_userdata);
+    return 0;
+
+fail:
+    assert(s->create_complete_callback);
+    s->create_complete_callback(s->context, NULL, s->create_complete_userdata);
+    pa_stream_free(s);
+    return ret;
+}
+
+int pa_stream_new(
+    struct pa_context *c,
+    enum pa_stream_direction dir,
+    const char *dev,
+    const char *name,
+    const struct pa_sample_spec *ss,
+    const struct pa_buffer_attr *attr,
+    void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
+    void *userdata) {
+    
+    struct pa_stream *s;
+    struct tagstruct *t;
+    uint32_t tag;
+
+    assert(c && name && ss && c->state == CONTEXT_READY && complete);
+    
+    s = malloc(sizeof(struct pa_stream));
+    assert(s);
+    s->context = c;
+
+    s->read_callback = NULL;
+    s->read_userdata = NULL;
+    s->write_callback = NULL;
+    s->write_userdata = NULL;
+    s->die_callback = NULL;
+    s->die_userdata = NULL;
+    s->create_complete_callback = complete;
+    s->create_complete_userdata = NULL;
+
+    s->state = STREAM_CREATING;
+    s->requested_bytes = 0;
+    s->channel = 0;
+    s->channel_valid = 0;
+    s->direction = dir;
+
+    t = tagstruct_new(NULL, 0);
+    assert(t);
+    
+    tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
+    tagstruct_putu32(t, tag = c->ctag++);
+    tagstruct_puts(t, name);
+    tagstruct_put_sample_spec(t, ss);
+    tagstruct_putu32(t, (uint32_t) -1);
+    tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
+    tagstruct_putu32(t, attr ? attr->max_length  : DEFAULT_MAX_LENGTH);
+    tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
+
+    pstream_send_tagstruct(c->pstream, t);
+
+    pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
+    
+    s->next = c->first_stream;
+    if (s->next)
+        s->next->previous = s;
+    s->previous = NULL;
+    c->first_stream = s;
+
+    return 0;
+}
+
+void pa_stream_free(struct pa_stream *s) {
+    assert(s && s->context);
+    
+    if (s->channel_valid) {
+        struct tagstruct *t = tagstruct_new(NULL, 0);
+        assert(t);
+    
+        tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
+        tagstruct_putu32(t, s->context->ctag++);
+        tagstruct_putu32(t, s->channel);
+        pstream_send_tagstruct(s->context->pstream, t);
+    }
+    
+    if (s->channel_valid)
+        dynarray_put(s->context->streams, s->channel, NULL);
+
+    if (s->next)
+        s->next->previous = s->previous;
+    if (s->previous)
+        s->previous->next = s->next;
+    else
+        s->context->first_stream = s->next;
+    
+    free(s);
+}
+
+void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
+    assert(s && cb);
+    s->write_callback = cb;
+    s->write_userdata = userdata;
+}
+
+void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
+    struct memchunk chunk;
+    assert(s && s->context && data && length);
+
+    chunk.memblock = memblock_new(length);
+    assert(chunk.memblock && chunk.memblock->data);
+    memcpy(chunk.memblock->data, data, length);
+    chunk.index = 0;
+    chunk.length = length;
+
+    pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
+
+    if (length < s->requested_bytes)
+        s->requested_bytes -= length;
+    else
+        s->requested_bytes = 0;
+}
+
+size_t pa_stream_writable_size(struct pa_stream *s) {
+    assert(s);
+    return s->requested_bytes;
+}
+
+void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
+    assert(s && cb);
+    s->read_callback = cb;
+    s->read_userdata = userdata;
+}
+
+int pa_stream_is_dead(struct pa_stream *s) {
+    return s->state == STREAM_DEAD;
+}
+
+int pa_stream_is_ready(struct pa_stream*s) {
+    return s->state == STREAM_READY;
+}
+
+void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
+    assert(s);
+    s->die_callback = cb;
+    s->die_userdata = userdata;
+}
diff --git a/src/polyp.h b/src/polyp.h
new file mode 100644 (file)
index 0000000..171e3bd
--- /dev/null
@@ -0,0 +1,53 @@
+#ifndef foopolyphfoo
+#define foopolyphfoo
+
+#include <sys/types.h>
+
+#include "sample.h"
+#include "polypdef.h"
+#include "mainloop-api.h"
+
+struct pa_context;
+
+struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name);
+
+int pa_context_connect(
+    struct pa_context *c,
+    const char *server,
+    void (*complete) (struct pa_context*c, int success, void *userdata),
+    void *userdata);
+
+void pa_context_free(struct pa_context *c);
+
+void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata);
+
+int pa_context_is_dead(struct pa_context *c);
+int pa_context_is_ready(struct pa_context *c);
+int pa_contect_errno(struct pa_context *c);
+
+struct pa_stream;
+
+int pa_stream_new(
+    struct pa_context *c,
+    enum pa_stream_direction dir,
+    const char *dev,
+    const char *name,
+    const struct pa_sample_spec *ss,
+    const struct pa_buffer_attr *attr,
+    void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
+    void *userdata);
+
+void pa_stream_free(struct pa_stream *p);
+
+void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata);
+
+void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata);
+void pa_stream_write(struct pa_stream *p, const void *data, size_t length);
+size_t pa_stream_writable_size(struct pa_stream *p);
+
+void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
+
+int pa_stream_is_dead(struct pa_stream *p);
+int pa_stream_is_ready(struct pa_stream*p);
+
+#endif
diff --git a/src/polypdef.h b/src/polypdef.h
new file mode 100644 (file)
index 0000000..aa6e6bf
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef foopolypdefhfoo
+#define foopolypdefhfoo
+
+#include <inttypes.h>
+
+enum pa_stream_direction {
+    PA_STREAM_PLAYBACK,
+    PA_STREAM_RECORD
+};
+
+struct pa_buffer_attr {
+    uint32_t queue_length;
+    uint32_t max_length;
+    uint32_t prebuf;
+};
+
+
+#endif
index c0c93d981f69535f35b7c5c9a2cb1b81ca391479..b6460fecfd759b5c604a3cf894a110777497ef66 100644 (file)
@@ -12,8 +12,7 @@ struct protocol_cli {
 
 static void cli_eof_cb(struct cli*c, void*userdata) {
     struct protocol_cli *p = userdata;
-    assert(c && p);
-
+    assert(p);
     idxset_remove_by_data(p->connections, c, NULL);
     cli_free(c);
 }
@@ -22,7 +21,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
     struct protocol_cli *p = userdata;
     struct cli *c;
     assert(s && io && p);
-    
+
     c = cli_new(p->core, io);
     assert(c);
     cli_set_eof_callback(c, cli_eof_cb, p);
diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h
new file mode 100644 (file)
index 0000000..df11ae3
--- /dev/null
@@ -0,0 +1,29 @@
+#ifndef fooprotocolnativespech
+#define fooprotocolnativespech
+
+enum {
+    PA_COMMAND_ERROR,
+    PA_COMMAND_TIMEOUT, /* pseudo command */
+    PA_COMMAND_REPLY,
+    PA_COMMAND_CREATE_PLAYBACK_STREAM,
+    PA_COMMAND_DELETE_PLAYBACK_STREAM,
+    PA_COMMAND_CREATE_RECORD_STREAM,
+    PA_COMMAND_DELETE_RECORD_STREAM,
+    PA_COMMAND_EXIT,
+    PA_COMMAND_REQUEST,
+    PA_COMMAND_MAX
+};
+
+enum {
+    PA_ERROR_OK,
+    PA_ERROR_ACCESS,
+    PA_ERROR_COMMAND,
+    PA_ERROR_INVALID,
+    PA_ERROR_EXIST,
+    PA_ERROR_NOENTITY,
+    PA_ERROR_CONNECTIONREFUSED,
+    PA_ERROR_PROTOCOL,
+    PA_ERROR_TIMEOUT
+};
+
+#endif
index e9cca7c180430fb55dbc469bda6586763d8ced9c..a39880b89c0b7e50bf80e7c4c0ca2b115f921812 100644 (file)
@@ -3,34 +3,19 @@
 #include <stdlib.h>
 
 #include "protocol-native.h"
+#include "protocol-native-spec.h"
 #include "packet.h"
 #include "client.h"
 #include "sourceoutput.h"
 #include "sinkinput.h"
 #include "pstream.h"
 #include "tagstruct.h"
+#include "pdispatch.h"
+#include "pstream-util.h"
 
 struct connection;
 struct protocol_native;
 
-enum {
-    COMMAND_ERROR,
-    COMMAND_REPLY,
-    COMMAND_CREATE_PLAYBACK_STREAM,
-    COMMAND_DELETE_PLAYBACK_STREAM,
-    COMMAND_CREATE_RECORD_STREAM,
-    COMMAND_DELETE_RECORD_STREAM,
-    COMMAND_EXIT,
-    COMMAND_MAX
-};
-
-enum {
-    ERROR_ACCESS,
-    ERROR_COMMAND,
-    ERROR_ARGUMENT,
-    ERROR_EXIST
-};
-
 struct record_stream {
     struct connection *connection;
     uint32_t index;
@@ -41,6 +26,7 @@ struct record_stream {
 struct playback_stream {
     struct connection *connection;
     uint32_t index;
+    size_t qlength;
     struct sink_input *sink_input;
     struct memblockq *memblockq;
 };
@@ -50,6 +36,7 @@ struct connection {
     struct protocol_native *protocol;
     struct client *client;
     struct pstream *pstream;
+    struct pdispatch *pdispatch;
     struct idxset *record_streams, *playback_streams;
 };
 
@@ -60,6 +47,29 @@ struct protocol_native {
     struct idxset *connections;
 };
 
+static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk);
+static void sink_input_drop_cb(struct sink_input *i, size_t length);
+static void sink_input_kill_cb(struct sink_input *i);
+static uint32_t sink_input_get_latency_cb(struct sink_input *i);
+
+static void request_bytes(struct playback_stream*s);
+
+static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
+
+static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
+    [PA_COMMAND_ERROR] = { NULL },
+    [PA_COMMAND_REPLY] = { NULL },
+    [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
+    [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
+    [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
+    [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
+    [PA_COMMAND_EXIT] = { command_exit },
+};
+
+/* structure management */
+
 static void record_stream_free(struct record_stream* r) {
     assert(r && r->connection);
 
@@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) {
     free(r);
 }
 
-static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) {
+static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
     struct playback_stream *s;
+    assert(c && sink && s && name && qlen && maxlength && prebuf);
 
     s = malloc(sizeof(struct playback_stream));
     assert (s);
     s->connection = c;
+    s->qlength = qlen;
+    
     s->sink_input = sink_input_new(sink, ss, name);
     assert(s->sink_input);
-    s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf);
+    s->sink_input->peek = sink_input_peek_cb;
+    s->sink_input->drop = sink_input_drop_cb;
+    s->sink_input->kill = sink_input_kill_cb;
+    s->sink_input->get_latency = sink_input_get_latency_cb;
+    s->sink_input->userdata = s;
+    
+    s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);
     assert(s->memblockq);
 
     idxset_put(c->playback_streams, s, &s->index);
+    request_bytes(s);
     return s;
 }
 
@@ -99,7 +119,6 @@ static void connection_free(struct connection *c) {
     assert(c && c->protocol);
 
     idxset_remove_by_data(c->protocol->connections, c, NULL);
-    pstream_free(c->pstream);
     while ((r = idxset_first(c->record_streams, NULL)))
         record_stream_free(r);
     idxset_free(c->record_streams, NULL, NULL);
@@ -108,67 +127,90 @@ static void connection_free(struct connection *c) {
         playback_stream_free(p);
     idxset_free(c->playback_streams, NULL, NULL);
 
+    pdispatch_free(c->pdispatch);
+    pstream_free(c->pstream);
     client_free(c->client);
     free(c);
 }
 
-/*** pstream callbacks ***/
+static void request_bytes(struct playback_stream *s) {
+    struct tagstruct *t;
+    size_t l;
+    assert(s);
 
-static void send_tagstruct(struct pstream *p, struct tagstruct *t) {
-    size_t length;
-    uint8_t *data;
-    struct packet *packet;
-    assert(p && t);
-
-    data = tagstruct_free_data(t, &length);
-    assert(data && length);
-    packet = packet_new_dynamic(data, length);
-    assert(packet);
-    pstream_send_packet(p, packet);
-    packet_unref(packet);
-}
+    if (!(l = memblockq_missing_to(s->memblockq, s->qlength)))
+        return;
 
-static void send_error(struct pstream *p, uint32_t tag, uint32_t error) {
-    struct tagstruct *t = tagstruct_new(NULL, 0);
+    t = tagstruct_new(NULL, 0);
     assert(t);
-    tagstruct_putu32(t, COMMAND_ERROR);
-    tagstruct_putu32(t, tag);
-    tagstruct_putu32(t, error);
-    send_tagstruct(p, t);
+    tagstruct_putu32(t, PA_COMMAND_REQUEST);
+    tagstruct_putu32(t, s->index);
+    tagstruct_putu32(t, l);
+    pstream_send_tagstruct(s->connection->pstream, t);
 }
 
-static void send_simple_ack(struct pstream *p, uint32_t tag) {
-    struct tagstruct *t = tagstruct_new(NULL, 0);
-    assert(t);
-    tagstruct_putu32(t, COMMAND_REPLY);
-    tagstruct_putu32(t, tag);
-    send_tagstruct(p, t);
+/*** sinkinput callbacks ***/
+
+static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
+    struct playback_stream *s;
+    assert(i && i->userdata && chunk);
+    s = i->userdata;
+
+    if (memblockq_peek(s->memblockq, chunk) < 0)
+        return -1;
+
+    return 0;
 }
 
-struct command {
-    int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t);
-};
+static void sink_input_drop_cb(struct sink_input *i, size_t length) {
+    struct playback_stream *s;
+    assert(i && i->userdata && length);
+    s = i->userdata;
 
-static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
+    memblockq_drop(s->memblockq, length);
+    request_bytes(s);
+}
+
+static void sink_input_kill_cb(struct sink_input *i) {
     struct playback_stream *s;
-    size_t maxlength, prebuf;
+    assert(i && i->userdata);
+    s = i->userdata;
+
+    playback_stream_free(s);
+}
+
+static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
+    struct playback_stream *s;
+    assert(i && i->userdata);
+    s = i->userdata;
+
+    return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
+}
+
+/*** pdispatch callbacks ***/
+
+static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
+    struct playback_stream *s;
+    size_t maxlength, prebuf, qlength;
     uint32_t sink_index;
     const char *name;
-    struct sample_spec ss;
+    struct pa_sample_spec ss;
     struct tagstruct *reply;
     struct sink *sink;
     assert(c && t && c->protocol && c->protocol->core);
     
     if (tagstruct_gets(t, &name) < 0 ||
         tagstruct_get_sample_spec(t, &ss) < 0 ||
-        tagstruct_getu32(t, &sink_index) < 0 || 
+        tagstruct_getu32(t, &sink_index) < 0 ||
+        tagstruct_getu32(t, &qlength) < 0 ||
         tagstruct_getu32(t, &maxlength) < 0 ||
         tagstruct_getu32(t, &prebuf) < 0 ||
         !tagstruct_eof(t))
         return -1;
 
     if (!c->authorized) {
-        send_error(c->pstream, tag, ERROR_ACCESS);
+        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
         return 0;
     }
 
@@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st
         sink = idxset_get_by_index(c->protocol->core->sinks, sink_index);
 
     if (!sink) {
-        send_error(c->pstream, tag, ERROR_EXIST);
+        pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
         return 0;
     }
     
-    if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) {
-        send_error(c->pstream, tag, ERROR_ARGUMENT);
+    if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) {
+        pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
         return 0;
     }
     
     reply = tagstruct_new(NULL, 0);
     assert(reply);
-    tagstruct_putu32(reply, COMMAND_REPLY);
+    tagstruct_putu32(reply, PA_COMMAND_REPLY);
     tagstruct_putu32(reply, tag);
     tagstruct_putu32(reply, s->index);
-    send_tagstruct(c->pstream, reply);
+    assert(s->sink_input);
+    tagstruct_putu32(reply, s->sink_input->index);
+    pstream_send_tagstruct(c->pstream, reply);
     return 0;
 }
 
-static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
+static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
     uint32_t channel;
     struct playback_stream *s;
     assert(c && t);
@@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st
         return -1;
 
     if (!c->authorized) {
-        send_error(c->pstream, tag, ERROR_ACCESS);
+        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
         return 0;
     }
     
     if (!(s = idxset_get_by_index(c->playback_streams, channel))) {
-        send_error(c->pstream, tag, ERROR_EXIST);
+        pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
         return 0;
     }
 
-    send_simple_ack(c->pstream, tag);
+    pstream_send_simple_ack(c->pstream, tag);
     return 0;
 }
 
-static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) {
+static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
+    struct connection *c = userdata;
     assert(c && t);
     
     if (!tagstruct_eof(t))
         return -1;
 
     if (!c->authorized) {
-        send_error(c->pstream, tag, ERROR_ACCESS);
+        pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
         return 0;
     }
     
-    assert(c->protocol && c->protocol->core);
-    mainloop_quit(c->protocol->core->mainloop, -1);
-    send_simple_ack(c->pstream, tag); /* nonsense */
+    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
+    c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
+    pstream_send_simple_ack(c->pstream, tag); /* nonsense */
     return 0;
 }
 
-static const struct command commands[] = {
-    [COMMAND_ERROR] = { NULL },
-    [COMMAND_REPLY] = { NULL },
-    [COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
-    [COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
-    [COMMAND_CREATE_RECORD_STREAM] = { NULL },
-    [COMMAND_DELETE_RECORD_STREAM] = { NULL },
-    [COMMAND_EXIT] = { command_exit },
-};
+/*** pstream callbacks ***/
+
 
 static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
     struct connection *c = userdata;
-    uint32_t tag, command;
-    struct tagstruct *ts = NULL;
     assert(p && packet && packet->data && c);
 
-    if (packet->length <= 8)
-        goto fail;
-
-    ts = tagstruct_new(packet->data, packet->length);
-    assert(ts);
-
-    if (tagstruct_getu32(ts, &command) < 0 ||
-        tagstruct_getu32(ts, &tag) < 0)
-        goto fail;
-
-    if (command >= COMMAND_MAX || !commands[command].func)
-        send_error(p, tag, ERROR_COMMAND);
-    else if (commands[command].func(c, tag, ts) < 0)
-        goto fail;
+    if (pdispatch_run(c->pdispatch, packet, c) < 0) {
+        fprintf(stderr, "protocol-native: invalid packet.\n");
+        return -1;
+    }
     
-    tagstruct_free(ts);    
-        
     return 0;
-
-fail:
-    if (ts)
-        tagstruct_free(ts);    
-
-    fprintf(stderr, "protocol-native: invalid packet.\n");
-    return -1;
-    
 }
 
 static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
@@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
     pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c);
     pstream_set_die_callback(c->pstream, die_callback, c);
 
+    c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
+    assert(c->pdispatch);
+
     c->record_streams = idxset_new(NULL, NULL);
     c->playback_streams = idxset_new(NULL, NULL);
     assert(c->record_streams && c->playback_streams);
index 8e4246cd735b56735a776c04c20c0bc9acf08a43..c8c458548a3286a266a8bf0179ce0fdb31caab1b 100644 (file)
@@ -9,6 +9,7 @@
 #include "sourceoutput.h"
 #include "protocol-simple.h"
 #include "client.h"
+#include "sample-util.h"
 
 struct connection {
     struct protocol_simple *protocol;
@@ -115,9 +116,10 @@ static int do_write(struct connection *c) {
 /*** sink_input callbacks ***/
 
 static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
-    struct connection*c = i->userdata;
-    assert(i && c && chunk);
-
+    struct connection*c;
+    assert(i && i->userdata && chunk);
+    c = i->userdata;
+    
     if (memblockq_peek(c->input_memblockq, chunk) < 0)
         return -1;
 
@@ -143,7 +145,7 @@ static void sink_input_kill_cb(struct sink_input *i) {
 static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
     struct connection*c = i->userdata;
     assert(i && c);
-    return samples_usec(memblockq_get_length(c->input_memblockq), &DEFAULT_SAMPLE_SPEC);
+    return pa_samples_usec(memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
 }
 
 /*** source_output callbacks ***/
@@ -185,6 +187,7 @@ static void io_callback(struct iochannel*io, void *userdata) {
 static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
     struct protocol_simple *p = userdata;
     struct connection *c = NULL;
+    char cname[256];
     assert(s && io && p);
 
     c = malloc(sizeof(struct connection));
@@ -195,7 +198,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
     c->input_memblockq = c->output_memblockq = NULL;
     c->protocol = p;
 
-    c->client = client_new(p->core, "SIMPLE", "Client");
+    iochannel_peer_to_string(io, cname, sizeof(cname));
+    c->client = client_new(p->core, "SIMPLE", cname);
     assert(c->client);
     c->client->kill = client_kill_cb;
     c->client->userdata = c;
@@ -215,8 +219,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
         c->source_output->kill = source_output_kill_cb;
         c->source_output->userdata = c;
 
-        l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
-        c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
+        l = 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
+        c->output_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
     }
 
     if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
@@ -236,8 +240,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
         c->sink_input->get_latency = sink_input_get_latency_cb;
         c->sink_input->userdata = c;
 
-        l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
-        c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
+        l = pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
+        c->input_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
     }
 
 
diff --git a/src/pstream-util.c b/src/pstream-util.c
new file mode 100644 (file)
index 0000000..2fab2b6
--- /dev/null
@@ -0,0 +1,35 @@
+#include <assert.h>
+
+#include "protocol-native-spec.h"
+#include "pstream-util.h"
+
+void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t) {
+    size_t length;
+    uint8_t *data;
+    struct packet *packet;
+    assert(p && t);
+
+    data = tagstruct_free_data(t, &length);
+    assert(data && length);
+    packet = packet_new_dynamic(data, length);
+    assert(packet);
+    pstream_send_packet(p, packet);
+    packet_unref(packet);
+}
+
+void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error) {
+    struct tagstruct *t = tagstruct_new(NULL, 0);
+    assert(t);
+    tagstruct_putu32(t, PA_COMMAND_ERROR);
+    tagstruct_putu32(t, tag);
+    tagstruct_putu32(t, error);
+    pstream_send_tagstruct(p, t);
+}
+
+void pstream_send_simple_ack(struct pstream *p, uint32_t tag) {
+    struct tagstruct *t = tagstruct_new(NULL, 0);
+    assert(t);
+    tagstruct_putu32(t, PA_COMMAND_REPLY);
+    tagstruct_putu32(t, tag);
+    pstream_send_tagstruct(p, t);
+}
diff --git a/src/pstream-util.h b/src/pstream-util.h
new file mode 100644 (file)
index 0000000..4e64a95
--- /dev/null
@@ -0,0 +1,14 @@
+#ifndef foopstreamutilhfoo
+#define foopstreamutilhfoo
+
+#include <inttypes.h>
+#include "pstream.h"
+#include "tagstruct.h"
+
+/* The tagstruct is freed!*/
+void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t);
+
+void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error);
+void pstream_send_simple_ack(struct pstream *p, uint32_t tag);
+
+#endif
index a63e126d1c4364650120fb74d950053f219abac6..4a3a648b10dc594238b2b5a00c9405b3af2f0e03 100644 (file)
@@ -30,7 +30,7 @@ struct item_info {
 };
 
 struct pstream {
-    struct mainloop *mainloop;
+    struct pa_mainloop_api *mainloop;
     struct mainloop_source *mainloop_source;
     struct iochannel *io;
     struct queue *send_queue;
@@ -70,18 +70,24 @@ static void do_read(struct pstream *p);
 static void io_callback(struct iochannel*io, void *userdata) {
     struct pstream *p = userdata;
     assert(p && p->io == io);
+
+    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
+    
     do_write(p);
     do_read(p);
 }
 
-static void prepare_callback(struct mainloop_source *s, void*userdata) {
+static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {
     struct pstream *p = userdata;
-    assert(p && p->mainloop_source == s);
+    assert(p && p->mainloop_source == id && p->mainloop == m);
+
+    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
+    
     do_write(p);
     do_read(p);
 }
 
-struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
+struct pstream *pstream_new(struct pa_mainloop_api *m, struct iochannel *io) {
     struct pstream *p;
     assert(io);
 
@@ -96,8 +102,8 @@ struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
     p->die_callback_userdata = NULL;
 
     p->mainloop = m;
-    p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p);
-    mainloop_source_enable(p->mainloop_source, 0);
+    p->mainloop_source = m->source_fixed(m, fixed_callback, p);
+    m->enable_fixed(m, p->mainloop_source, 0);
     
     p->send_queue = queue_new();
     assert(p->send_queue);
@@ -152,7 +158,7 @@ void pstream_free(struct pstream *p) {
     if (p->read.packet)
         packet_unref(p->read.packet);
 
-    mainloop_source_free(p->mainloop_source);
+    p->mainloop->cancel_fixed(p->mainloop, p->mainloop_source);
     free(p);
 }
 
@@ -173,7 +179,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) {
     i->packet = packet_ref(packet);
 
     queue_push(p->send_queue, i);
-    mainloop_source_enable(p->mainloop_source, 1);
+    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
 }
 
 void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
@@ -190,7 +196,7 @@ void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, st
     memblock_ref(i->chunk.memblock);
 
     queue_push(p->send_queue, i);
-    mainloop_source_enable(p->mainloop_source, 1);
+    p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
 }
 
 void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
@@ -219,7 +225,7 @@ static void prepare_next_write_item(struct pstream *p) {
         assert(p->write.current->packet);
         p->write.data = p->write.current->packet->data;
         p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
-        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
+        p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
         p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
     } else {
         assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
@@ -236,8 +242,6 @@ static void do_write(struct pstream *p) {
     ssize_t r;
     assert(p);
 
-    mainloop_source_enable(p->mainloop_source, 0);
-
     if (p->dead || !iochannel_is_writable(p->io))
         return;
     
@@ -285,8 +289,6 @@ static void do_read(struct pstream *p) {
     ssize_t r;
     assert(p);
 
-    mainloop_source_enable(p->mainloop_source, 0);
-    
     if (p->dead || !iochannel_is_readable(p->io))
         return;
 
@@ -313,7 +315,7 @@ static void do_read(struct pstream *p) {
         
         assert(!p->read.packet && !p->read.memblock);
 
-        if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) {
+        if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
             /* Frame is a packet frame */
             p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));
             assert(p->read.packet);
@@ -331,7 +333,7 @@ static void do_read(struct pstream *p) {
         if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
             size_t l;
 
-            l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
+            l = (p->read.index - r) < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
                 
             if (l > 0) {
                 struct memchunk chunk;
index 7113681ea58f19c17ad358eae4195d0e7f151af0..d418908e0d45d00183fac4ae6f9d664bb779a29c 100644 (file)
@@ -6,10 +6,11 @@
 #include "packet.h"
 #include "memblock.h"
 #include "iochannel.h"
+#include "mainloop-api.h"
 
 struct pstream;
 
-struct pstream* pstream_new(struct mainloop *m, struct iochannel *io);
+struct pstream* pstream_new(struct pa_mainloop_api *m, struct iochannel *io);
 void pstream_free(struct pstream*p);
 
 void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata);
index 90823ae6fb7a14b513a7d84d0f6f449e166c9f43..5c2e7a67d9ba0ecaf119e59e5ff90ee41191b914 100644 (file)
@@ -73,5 +73,12 @@ void* queue_pop(struct queue *q) {
     p = e->data;
     free(e);
 
+    q->length--;
+    
     return p;
 }
+
+int queue_is_empty(struct queue *q) {
+    assert(q);
+    return q->length == 0;
+}
diff --git a/src/sample-util.c b/src/sample-util.c
new file mode 100644 (file)
index 0000000..7a3c267
--- /dev/null
@@ -0,0 +1,88 @@
+#include <string.h>
+#include <assert.h>
+
+#include "sample-util.h"
+
+struct pa_sample_spec default_sample_spec = {
+    .format = SAMPLE_S16NE,
+    .rate = 44100,
+    .channels = 2
+};
+
+struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec) {
+    assert(b && b->data && spec);
+    memblock_assert_exclusive(b);
+    silence_memory(b->data, b->length, spec);
+    return b;
+}
+
+void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec) {
+    assert(c && c->memblock && c->memblock->data && spec && c->length);
+    memblock_assert_exclusive(c->memblock);
+    silence_memory(c->memblock->data+c->index, c->length, spec);
+}
+
+void silence_memory(void *p, size_t length, struct pa_sample_spec *spec) {
+    char c = 0;
+    assert(p && length && spec);
+
+    switch (spec->format) {
+        case SAMPLE_U8:
+            c = 127;
+            break;
+        case SAMPLE_S16LE:
+        case SAMPLE_S16BE:
+        case SAMPLE_FLOAT32:
+            c = 0;
+            break;
+        case SAMPLE_ALAW:
+        case SAMPLE_ULAW:
+            c = 80;
+            break;
+    }
+                
+    memset(p, c, length);
+}
+
+size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume) {
+    unsigned c, d;
+    assert(channels && data && length && spec);
+    assert(spec->format == SAMPLE_S16NE);
+
+    for (d = 0;; d += sizeof(int16_t)) {
+        int32_t sum = 0;
+
+        if (d >= length)
+            return d;
+        
+        for (c = 0; c < nchannels; c++) {
+            int32_t v;
+            uint8_t volume = channels[c].volume;
+            
+            if (d >= channels[c].chunk.length)
+                return d;
+
+            if (volume == 0)
+                v = 0;
+            else {
+                v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
+
+                if (volume != 0xFF)
+                    v = v*volume/0xFF;
+            }
+
+            sum += v;
+        }
+
+        if (volume == 0)
+            sum = 0;
+        else if (volume != 0xFF)
+            sum = sum*volume/0xFF;
+        
+        if (sum < -0x8000) sum = -0x8000;
+        if (sum > 0x7FFF) sum = 0x7FFF;
+        
+        *((int16_t*) data) = sum;
+        data += sizeof(int16_t);
+    }
+}
diff --git a/src/sample-util.h b/src/sample-util.h
new file mode 100644 (file)
index 0000000..0a3f7c8
--- /dev/null
@@ -0,0 +1,23 @@
+#ifndef foosampleutilhfoo
+#define foosampleutilhfoo
+
+#include "sample.h"
+#include "memblock.h"
+
+#define DEFAULT_SAMPLE_SPEC default_sample_spec
+
+extern struct pa_sample_spec default_sample_spec;
+
+struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec);
+void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec);
+void silence_memory(void *p, size_t length, struct pa_sample_spec *spec);
+
+struct mix_info {
+    struct memchunk chunk;
+    uint8_t volume;
+    void *userdata;
+};
+
+size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume);
+
+#endif
index c270f255dffd9db32616ff977f8a65d8b4da4ec9..2454630cf716684f920f843a608c0911ef969ccc 100644 (file)
@@ -1,50 +1,8 @@
-#include <string.h>
 #include <assert.h>
 
 #include "sample.h"
 
-struct sample_spec default_sample_spec = {
-    .format = SAMPLE_S16NE,
-    .rate = 44100,
-    .channels = 2
-};
-
-struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec) {
-    assert(b && b->data && spec);
-    memblock_assert_exclusive(b);
-    silence_memory(b->data, b->length, spec);
-    return b;
-}
-
-void silence_memchunk(struct memchunk *c, struct sample_spec *spec) {
-    assert(c && c->memblock && c->memblock->data && spec && c->length);
-    memblock_assert_exclusive(c->memblock);
-    silence_memory(c->memblock->data+c->index, c->length, spec);
-}
-
-void silence_memory(void *p, size_t length, struct sample_spec *spec) {
-    char c = 0;
-    assert(p && length && spec);
-
-    switch (spec->format) {
-        case SAMPLE_U8:
-            c = 127;
-            break;
-        case SAMPLE_S16LE:
-        case SAMPLE_S16BE:
-        case SAMPLE_FLOAT32:
-            c = 0;
-            break;
-        case SAMPLE_ALAW:
-        case SAMPLE_ULAW:
-            c = 80;
-            break;
-    }
-                
-    memset(p, c, length);
-}
-
-size_t sample_size(struct sample_spec *spec) {
+size_t pa_sample_size(struct pa_sample_spec *spec) {
     assert(spec);
     size_t b = 1;
 
@@ -66,56 +24,14 @@ size_t sample_size(struct sample_spec *spec) {
     return b * spec->channels;
 }
 
-size_t bytes_per_second(struct sample_spec *spec) {
+size_t pa_bytes_per_second(struct pa_sample_spec *spec) {
     assert(spec);
-    return spec->rate*sample_size(spec);
+    return spec->rate*pa_sample_size(spec);
 }
 
-size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) {
-    unsigned c, d;
-    assert(channels && data && length && spec);
-    assert(spec->format == SAMPLE_S16NE);
-
-    for (d = 0;; d += sizeof(int16_t)) {
-        int32_t sum = 0;
-
-        if (d >= length)
-            return d;
-        
-        for (c = 0; c < nchannels; c++) {
-            int32_t v;
-            uint8_t volume = channels[c].volume;
-            
-            if (d >= channels[c].chunk.length)
-                return d;
-
-            if (volume == 0)
-                v = 0;
-            else {
-                v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
-
-                if (volume != 0xFF)
-                    v = v*volume/0xFF;
-            }
-
-            sum += v;
-        }
-
-        if (volume == 0)
-            sum = 0;
-        else if (volume != 0xFF)
-            sum = sum*volume/0xFF;
-        
-        if (sum < -0x8000) sum = -0x8000;
-        if (sum > 0x7FFF) sum = 0x7FFF;
-        
-        *((int16_t*) data) = sum;
-        data += sizeof(int16_t);
-    }
-}
 
-uint32_t samples_usec(size_t length, struct sample_spec *spec) {
+uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec) {
     assert(spec);
 
-    return (uint32_t) (((double) length /sample_size(spec))/spec->rate*1000000);
+    return (uint32_t) (((double) length /pa_sample_size(spec))/spec->rate*1000000);
 }
index b2f13cc493ada616cd2a1c81836493e2f3515eb2..a4a973bf18fde27aee28f24d563e2840ba2078d4 100644 (file)
@@ -2,10 +2,9 @@
 #define foosamplehfoo
 
 #include <inttypes.h>
+#include <sys/types.h>
 
-#include "memblock.h"
-
-enum sample_format {
+enum pa_sample_format {
     SAMPLE_U8,
     SAMPLE_ALAW,
     SAMPLE_ULAW,
@@ -16,30 +15,14 @@ enum sample_format {
 
 #define SAMPLE_S16NE SAMPLE_S16LE
 
-struct sample_spec {
-    enum sample_format format;
+struct pa_sample_spec {
+    enum pa_sample_format format;
     uint32_t rate;
     uint8_t channels;
 };
 
-#define DEFAULT_SAMPLE_SPEC default_sample_spec
-
-extern struct sample_spec default_sample_spec;
-
-struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec);
-void silence_memchunk(struct memchunk *c, struct sample_spec *spec);
-void silence_memory(void *p, size_t length, struct sample_spec *spec);
-
-struct mix_info {
-    struct memchunk chunk;
-    uint8_t volume;
-    void *userdata;
-};
-
-size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume);
-
-size_t bytes_per_second(struct sample_spec *spec);
-size_t sample_size(struct sample_spec *spec);
-uint32_t samples_usec(size_t length, struct sample_spec *spec);
+size_t pa_bytes_per_second(struct pa_sample_spec *spec);
+size_t pa_sample_size(struct pa_sample_spec *spec);
+uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec);
 
 #endif
diff --git a/src/simple.c b/src/simple.c
new file mode 100644 (file)
index 0000000..a90d22b
--- /dev/null
@@ -0,0 +1,120 @@
+#include "simple.h"
+#include "polyp.h"
+#include "mainloop.h"
+
+struct pa_simple {
+    struct mainloop *mainloop;
+    struct pa_context *context;
+    struct pa_stream *stream;
+
+    size_t requested;
+    int dead;
+};
+
+static void playback_callback(struct pa_stream *p, size_t length, void *userdata) {
+    struct pa_stream *sp = userdata;
+    assert(p && length && sp);
+
+    sp->requested = length;
+}
+
+struct pa_simple* pa_simple_new(
+    const char *server,
+    const char *name,
+    enum pa_stream_direction dir,
+    const char *dev,
+    const char *stream_name,
+    const struct pa_sample_spec *ss,
+    const struct pa_buffer_attr *attr) {
+    
+    struct pa_simple *p;
+    assert(ss);
+
+    p = malloc(sizeof(struct pa_simple));
+    assert(p);
+    p->context = NULL;
+    p->stream = NULL;
+    p->mainloop = pa_mainloop_new();
+    assert(p->mainloop);
+    p->requested = 0;
+    p->dead = 0;
+
+    if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name)))
+        goto fail;
+
+    if (pa_context_connect(c, server, NULL, NULL) < 0)
+        goto fail;
+
+    while (!pa_context_is_ready(c)) {
+        if (pa_context_is_dead(c))
+            goto fail;
+        
+        if (mainloop_iterate(p->mainloop) < 0)
+            goto fail;
+    }
+
+    if (!(p->stream = pa_stream_new(p->context, dir, sink, stream_name, ss, attr, NULL, NULL)))
+        goto fail;
+
+    while (!pa_stream_is_ready(c)) {
+        if (pa_stream_is_dead(c))
+            goto fail;
+
+        if (mainloop_iterate(p->mainloop) < 0)
+            goto fail;
+    }
+
+    pa_stream_set_write_callback(p->stream, playback_callback, p);
+
+    return p;
+    
+fail:
+    pa_simple_free(p);
+    return NULL;
+}
+
+void pa_simple_free(struct pa_simple *s) {
+    assert(s);
+
+    if (s->stream)
+        pa_stream_free(s->stream);
+    
+    if (s->context)
+        pa_context_free(s->context);
+
+    if (s->mainloop)
+        mainloop_free(s->mainloop);
+
+    free(s);
+}
+
+int pa_simple_write(struct pa_simple *s, const void*data, size_t length) {
+    assert(s && data);
+
+    while (length > 0) {
+        size_t l;
+        
+        while (!s->requested) {
+            if (pa_context_is_dead(c))
+                return -1;
+            
+            if (mainloop_iterate(s->mainloop) < 0)
+                return -1;
+        }
+
+        l = length;
+        if (l > s->requested)
+            l = s->requested;
+
+        pa_stream_write(s->stream, data, l);
+        data += l;
+        length -= l;
+        s->requested = -l;
+    }
+
+    return 0;
+}
+
+int pa_simple_read(struct pa_simple *s, const void*data, size_t length) {
+    assert(0);
+}
diff --git a/src/simple.h b/src/simple.h
new file mode 100644 (file)
index 0000000..8069305
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef foosimplehfoo
+#define foosimplehfoo
+
+#include <sys/types.h>
+
+#include "sample.h"
+#include "polypdef.h"
+
+struct pa_simple;
+
+struct pa_simple* pa_simple_new(
+    const char *server,
+    const char *name,
+    enum pa_stream_direction dir,
+    const char *dev,
+    const char *stream_name,
+    const struct pa_sample_spec *ss,
+    const struct pa_buffer_attr *attr);
+
+void pa_simple_free(struct pa_simple *s);
+
+int pa_simple_write(struct pa_simple *s, const void*data, size_t length);
+int pa_simple_read(struct pa_simple *s, const void*data, size_t length);
+
+#endif
index cd12b463fa32ba85fb38be48f0f1efacaa52fc83..a334424c201c8797a01db0838b95440606bc18f6 100644 (file)
@@ -6,10 +6,11 @@
 #include "sink.h"
 #include "sinkinput.h"
 #include "strbuf.h"
+#include "sample-util.h"
 
 #define MAX_MIX_CHANNELS 32
 
-struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) {
+struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
     struct sink *s;
     char *n = NULL;
     int r;
index 394abb5bab230f25e068c62a98238bc8c844c170..d9f80059d229ca60227bd8843da262d3d6ddfe90 100644 (file)
@@ -15,7 +15,7 @@ struct sink {
 
     char *name;
     struct core *core;
-    struct sample_spec sample_spec;
+    struct pa_sample_spec sample_spec;
     struct idxset *inputs;
 
     struct source *monitor_source;
@@ -27,7 +27,7 @@ struct sink {
     void *userdata;
 };
 
-struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec);
+struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
 void sink_free(struct sink* s);
 
 int sink_render(struct sink*s, size_t length, struct memchunk *result);
index 2e6a8c36672b220e57c7f38a5ad5424f7c86e4ae..b81c9c7135f8999db4c60bc885a04a59c93a3e56 100644 (file)
@@ -5,7 +5,7 @@
 #include "sinkinput.h"
 #include "strbuf.h"
 
-struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) {
+struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name) {
     struct sink_input *i;
     int r;
     assert(s && spec);
@@ -14,7 +14,7 @@ struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, cons
     assert(i);
     i->name = name ? strdup(name) : NULL;
     i->sink = s;
-    i->spec = *spec;
+    i->sample_spec = *spec;
 
     i->peek = NULL;
     i->drop = NULL;
index 389d832dd6dfc17e992572cd930dcceb6b212996..f04ecb952c5c14d1f53900c329bb49a6965470b4 100644 (file)
@@ -12,7 +12,7 @@ struct sink_input {
 
     char *name;
     struct sink *sink;
-    struct sample_spec spec;
+    struct pa_sample_spec sample_spec;
     uint8_t volume;
     
     int (*peek) (struct sink_input *i, struct memchunk *chunk);
@@ -23,7 +23,7 @@ struct sink_input {
     void *userdata;
 };
 
-struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name);
+struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name);
 void sink_input_free(struct sink_input* i);
 
 /* Code that didn't create the input stream should call this function to
diff --git a/src/socket-client.c b/src/socket-client.c
new file mode 100644 (file)
index 0000000..812c43f
--- /dev/null
@@ -0,0 +1,177 @@
+#include <unistd.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "socket-client.h"
+#include "util.h"
+
+struct socket_client {
+    struct pa_mainloop_api *mainloop;
+    int fd;
+
+    void *io_source, *fixed_source;
+    void (*callback)(struct socket_client*c, struct iochannel *io, void *userdata);
+    void *userdata;
+};
+
+static struct socket_client*socket_client_new(struct pa_mainloop_api *m) {
+    struct socket_client *c;
+    assert(m);
+
+    c = malloc(sizeof(struct socket_client));
+    assert(c);
+    c->mainloop = m;
+    c->fd = -1;
+    c->io_source = c->fixed_source = NULL;
+    c->callback = NULL;
+    c->userdata = NULL;
+    return c;
+}
+
+static void do_call(struct socket_client *c) {
+    struct iochannel *io;
+    int error, lerror;
+    assert(c && c->callback);
+
+    lerror = sizeof(error);
+    if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &error, &lerror) < 0) {
+        fprintf(stderr, "getsockopt(): %s\n", strerror(errno));
+        goto failed;
+    }
+
+    if (lerror != sizeof(error)) {
+        fprintf(stderr, "getsocktop() returned invalid size.\n");
+        goto failed;
+    }
+
+    if (error != 0) {
+        fprintf(stderr, "connect(): %s\n", strerror(error));
+        goto failed;
+    }
+        
+    io = iochannel_new(c->mainloop, c->fd, c->fd);
+    assert(io);
+    c->fd = -1;
+    c->callback(c, io, c->userdata);
+
+    return;
+    
+failed:
+    close(c->fd);
+    c->fd = -1;
+    c->callback(c, NULL, c->userdata);
+    return;
+}
+
+static void connect_fixed_cb(struct pa_mainloop_api *m, void *id, void *userdata) {
+    struct socket_client *c = userdata;
+    assert(m && c && c->fixed_source == id);
+    m->cancel_fixed(m, c->fixed_source);
+    c->fixed_source = NULL;
+    do_call(c);
+}
+
+static void connect_io_cb(struct pa_mainloop_api*m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
+    struct socket_client *c = userdata;
+    assert(m && c && c->io_source == id && fd >= 0 && events == PA_MAINLOOP_API_IO_EVENT_OUTPUT);
+    m->cancel_io(m, c->io_source);
+    c->io_source = NULL;
+    do_call(c);
+}
+
+static int do_connect(struct socket_client *c, const struct sockaddr *sa, socklen_t len) {
+    int r;
+    assert(c && sa && len);
+    
+    make_nonblock_fd(c->fd);
+    
+    if ((r = connect(c->fd, sa, len)) < 0) {
+        if (r != EINPROGRESS) {
+            fprintf(stderr, "connect(): %s\n", strerror(errno));
+            return -1;
+        }
+
+        c->io_source = c->mainloop->source_io(c->mainloop, c->fd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, connect_io_cb, c);
+        assert(c->io_source);
+    } else {
+        c->fixed_source = c->mainloop->source_fixed(c->mainloop, connect_fixed_cb, c);
+        assert(c->io_source);
+    }
+
+    return 0;
+}
+
+struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
+    struct socket_client *c;
+    struct sockaddr_in sa;
+
+    c = socket_client_new(m);
+    assert(c);
+
+    if ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+        fprintf(stderr, "socket(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    sa.sin_family = AF_INET;
+    sa.sin_port = htons(port);
+    sa.sin_addr.s_addr = htonl(address);
+
+    if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
+        goto fail;
+    
+    return c;
+
+fail:
+    socket_client_free(c);
+    return NULL;
+}
+
+struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename) {
+    struct socket_client *c;
+    struct sockaddr_un sa;
+    
+    c = socket_client_new(m);
+    assert(c);
+
+    if ((c->fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) {
+        fprintf(stderr, "socket(): %s\n", strerror(errno));
+        goto fail;
+    }
+
+    sa.sun_family = AF_LOCAL;
+    strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1);
+    sa.sun_path[sizeof(sa.sun_path) - 1] = 0;
+    
+    if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
+        goto fail;
+    
+    return c;
+
+fail:
+    socket_client_free(c);
+    return NULL;
+}
+    
+void socket_client_free(struct socket_client *c) {
+    assert(c && c->mainloop);
+    if (c->io_source)
+        c->mainloop->cancel_io(c->mainloop, c->io_source);
+    if (c->fixed_source)
+        c->mainloop->cancel_fixed(c->mainloop, c->fixed_source);
+    if (c->fd >= 0)
+        close(c->fd);
+    free(c);
+}
+
+void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata) {
+    assert(c);
+    c->callback = on_connection;
+    c->userdata = userdata;
+}
diff --git a/src/socket-client.h b/src/socket-client.h
new file mode 100644 (file)
index 0000000..4de01e3
--- /dev/null
@@ -0,0 +1,17 @@
+#ifndef foosocketclienthfoo
+#define foosocketclienthfoo
+
+#include <inttypes.h>
+#include "mainloop-api.h"
+#include "iochannel.h"
+
+struct socket_client;
+
+struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
+struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename);
+
+void socket_client_free(struct socket_client *c);
+
+void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata);
+
+#endif
index 6ad225e395bc122e39e3181fbfbd50ec2e24338f..87fe147680181ecd87481dccdbc6bc1de570ec86 100644 (file)
@@ -19,14 +19,15 @@ struct socket_server {
     void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);
     void *userdata;
 
-    struct mainloop_source *mainloop_source;
+    void *mainloop_source;
+    struct pa_mainloop_api *mainloop;
 };
 
-static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) {
+static void callback(struct pa_mainloop_api *mainloop, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
     struct socket_server *s = userdata;
     struct iochannel *io;
     int nfd;
-    assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s);
+    assert(s && s->mainloop == mainloop && s->mainloop_source == id && id && fd >= 0 && fd == s->fd && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
 
     if ((nfd = accept(fd, NULL, NULL)) < 0) {
         fprintf(stderr, "accept(): %s\n", strerror(errno));
@@ -38,12 +39,12 @@ static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event
         return;
     }
 
-    io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd);
+    io = iochannel_new(s->mainloop, nfd, nfd);
     assert(io);
     s->on_connection(s, io, s->userdata);
 }
 
-struct socket_server* socket_server_new(struct mainloop *m, int fd) {
+struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd) {
     struct socket_server *s;
     assert(m && fd >= 0);
     
@@ -54,13 +55,14 @@ struct socket_server* socket_server_new(struct mainloop *m, int fd) {
     s->on_connection = NULL;
     s->userdata = NULL;
 
-    s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s);
+    s->mainloop = m;
+    s->mainloop_source = m->source_io(m, fd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, s);
     assert(s->mainloop_source);
     
     return s;
 }
 
-struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) {
+struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename) {
     int fd = -1;
     struct sockaddr_un sa;
     struct socket_server *s;
@@ -101,7 +103,7 @@ fail:
     return NULL;
 }
 
-struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) {
+struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
     int fd = -1;
     struct sockaddr_in sa;
     int on = 1;
@@ -148,7 +150,8 @@ void socket_server_free(struct socket_server*s) {
         free(s->filename);
     }
 
-    mainloop_source_free(s->mainloop_source);
+    
+    s->mainloop->cancel_io(s->mainloop, s->mainloop_source);
     
     free(s);
 }
index 4814fc62633fc1b235bda118449846e43edb2866..80895f8db21cbca678efc9aaeb2e413e3245fb03 100644 (file)
@@ -2,14 +2,14 @@
 #define foosocketserverhfoo
 
 #include <inttypes.h>
-#include "mainloop.h"
+#include "mainloop-api.h"
 #include "iochannel.h"
 
 struct socket_server;
 
-struct socket_server* socket_server_new(struct mainloop *m, int fd);
-struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename);
-struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port);
+struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd);
+struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename);
+struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
 
 void socket_server_free(struct socket_server*s);
 
index 44d775327e01aa7cd6fb863c0d767f318575de6c..21ac24f3533a44df0f674f72170409b16b6730e2 100644 (file)
@@ -7,7 +7,7 @@
 #include "sourceoutput.h"
 #include "strbuf.h"
 
-struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) {
+struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
     struct source *s;
     int r;
     assert(core && spec);
index 078fb1c9a4d1ecb510b7dbbf439c3a6415684c34..04f3984f7379816c8acdf72a0e553f8ec20bad76 100644 (file)
@@ -14,14 +14,14 @@ struct source {
     
     char *name;
     struct core *core;
-    struct sample_spec sample_spec;
+    struct pa_sample_spec sample_spec;
     struct idxset *outputs;
 
     void (*notify)(struct source*source);
     void *userdata;
 };
 
-struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec);
+struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
 void source_free(struct source *s);
 
 /* Pass a new memory block to all output streams */
index 8021b522bb0f0421025dacefa85c3c8e79cc9f08..e2e1dacc3cf5eebe7cfcda1d92e768181ee1648b 100644 (file)
@@ -5,7 +5,7 @@
 #include "sourceoutput.h"
 #include "strbuf.h"
 
-struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) {
+struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name) {
     struct source_output *o;
     int r;
     assert(s && spec);
@@ -14,7 +14,7 @@ struct source_output* source_output_new(struct source *s, struct sample_spec *sp
     assert(o);
     o->name = name ? strdup(name) : NULL;
     o->source = s;
-    o->spec = *spec;
+    o->sample_spec = *spec;
 
     o->push = NULL;
     o->kill = NULL;
index 359ff1511fb191c7d52aa7ff401e7058293a5559..50cb9caf5f16a333860da383e82bbde7239d4132 100644 (file)
@@ -12,7 +12,7 @@ struct source_output {
 
     char *name;
     struct source *source;
-    struct sample_spec spec;
+    struct pa_sample_spec sample_spec;
     
     void (*push)(struct source_output *o, struct memchunk *chunk);
     void (*kill)(struct source_output* o);
@@ -20,7 +20,7 @@ struct source_output {
     void *userdata;
 };
 
-struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name);
+struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name);
 void source_output_free(struct source_output* o);
 
 void source_output_kill(struct source_output*o);
index 429dd40885a61682e65b0b8c6302086839c4e5fe..47e17839bf840c6356c2cb094013b8b4015cbbd4 100644 (file)
@@ -90,7 +90,7 @@ void tagstruct_putu8(struct tagstruct*t, uint8_t c) {
     t->length += 2;
 }
 
-void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
+void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss) {
     assert(t && ss);
     extend(t, 7);
     t->data[t->length] = TAG_SAMPLE_SPEC;
@@ -156,7 +156,7 @@ int tagstruct_getu8(struct tagstruct*t, uint8_t *c) {
     return 0;
 }
 
-int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
+int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss) {
     assert(t && ss);
 
     if (t->rindex+7 > t->length)
index 5572c64c6a3fa1d0d2c6a6da1dc8624f4cf6092a..9f6a0bf4aad9c8e7b1c17905c1afa98f6d83107b 100644 (file)
@@ -15,12 +15,12 @@ uint8_t* tagstruct_free_data(struct tagstruct*t, size_t *l);
 void tagstruct_puts(struct tagstruct*t, const char *s);
 void tagstruct_putu32(struct tagstruct*t, uint32_t i);
 void tagstruct_putu8(struct tagstruct*t, uint8_t c);
-void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss);
+void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss);
 
 int tagstruct_gets(struct tagstruct*t, const char **s);
 int tagstruct_getu32(struct tagstruct*t, uint32_t *i);
 int tagstruct_getu8(struct tagstruct*t, uint8_t *c);
-int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss);
+int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss);
 
 int tagstruct_eof(struct tagstruct*t);
 const uint8_t* tagstruct_data(struct tagstruct*t, size_t *l);
index aeb7ae5fc16e7d6c7143d35dd5aee655e658eb9c..47344ab42472953d0996a6e838ea13008508ac69 100644 (file)
--- a/src/todo
+++ b/src/todo
@@ -1,8 +1,10 @@
+- sync() function in native library
+- name registrar
 - native protocol/library
 - simple control protocol: kill client/input/output; set_volume
 - resampling
 - esound protocol
-- config parser
+- config parser/cmdline
 - record testing
 -- 0.1
 - optimierung von rebuild_pollfds()
diff --git a/src/util.c b/src/util.c
new file mode 100644 (file)
index 0000000..0383a0a
--- /dev/null
@@ -0,0 +1,62 @@
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+
+#include "util.h"
+
+void make_nonblock_fd(int fd) {
+    int v;
+
+    if ((v = fcntl(fd, F_GETFL)) >= 0)
+        if (!(v & O_NONBLOCK))
+            fcntl(fd, F_SETFL, v|O_NONBLOCK);
+}
+
+void peer_to_string(char *c, size_t l, int fd) {
+    struct stat st;
+
+    assert(c && l && fd >= 0);
+    
+    if (fstat(fd, &st) < 0) {
+        snprintf(c, l, "Invalid client fd");
+        return;
+    }
+
+    if (S_ISSOCK(st.st_mode)) {
+        union {
+            struct sockaddr sa;
+            struct sockaddr_in in;
+            struct sockaddr_un un;
+        } sa;
+        socklen_t sa_len = sizeof(sa);
+        
+        if (getpeername(fd, &sa.sa, &sa_len) >= 0) {
+
+            if (sa.sa.sa_family == AF_INET) {
+                uint32_t ip = ntohl(sa.in.sin_addr.s_addr);
+                
+                snprintf(c, l, "TCP/IP client from %i.%i.%i.%i:%u",
+                         ip >> 24,
+                         (ip >> 16) & 0xFF,
+                         (ip >> 8) & 0xFF,
+                         ip & 0xFF,
+                         ntohs(sa.in.sin_port));
+                return;
+            } else if (sa.sa.sa_family == AF_LOCAL) {
+                snprintf(c, l, "UNIX client for %s", sa.un.sun_path);
+                return;
+            }
+
+        }
+        snprintf(c, l, "Unknown network client");
+        return;
+    } else if (S_ISCHR(st.st_mode) && (fd == 0 || fd == 1)) {
+        snprintf(c, l, "STDIN/STDOUT client");
+        return;
+    }
+
+    snprintf(c, l, "Unknown client");
+}
diff --git a/src/util.h b/src/util.h
new file mode 100644 (file)
index 0000000..830ee2e
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef fooutilhfoo
+#define fooutilhfoo
+
+void make_nonblock_fd(int fd);
+
+void peer_to_string(char *c, size_t l, int fd);
+
+#endif