]> code.delx.au - pulseaudio/blobdiff - src/modules/module-zeroconf-publish.c
zeroconf-publish: Don't react to messages while shutting down
[pulseaudio] / src / modules / module-zeroconf-publish.c
index 6ed8e3d9a79427f8e1aaae1e8f74a388c2d5674d..db928507ab99aa6aaa4edba90b69e6f54cc97e53 100644 (file)
@@ -1,5 +1,3 @@
-/* $Id$ */
-
 /***
   This file is part of PulseAudio.
 
 /***
   This file is part of PulseAudio.
 
@@ -7,7 +5,7 @@
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as
 
   PulseAudio is free software; you can redistribute it and/or modify
   it under the terms of the GNU Lesser General Public License as
-  published by the Free Software Foundation; either version 2 of the
+  published by the Free Software Foundation; either version 2.1 of the
   License, or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
   License, or (at your option) any later version.
 
   PulseAudio is distributed in the hope that it will be useful, but
@@ -27,7 +25,6 @@
 
 #include <stdio.h>
 #include <stdlib.h>
 
 #include <stdio.h>
 #include <stdlib.h>
-#include <string.h>
 #include <unistd.h>
 
 #include <avahi-client/client.h>
 #include <unistd.h>
 
 #include <avahi-client/client.h>
 
 #include <pulse/xmalloc.h>
 #include <pulse/util.h>
 
 #include <pulse/xmalloc.h>
 #include <pulse/util.h>
+#include <pulse/thread-mainloop.h>
 
 
+#include <pulsecore/parseaddr.h>
 #include <pulsecore/sink.h>
 #include <pulsecore/source.h>
 #include <pulsecore/native-common.h>
 #include <pulsecore/core-util.h>
 #include <pulsecore/log.h>
 #include <pulsecore/sink.h>
 #include <pulsecore/source.h>
 #include <pulsecore/native-common.h>
 #include <pulsecore/core-util.h>
 #include <pulsecore/log.h>
-#include <pulsecore/core-subscribe.h>
 #include <pulsecore/dynarray.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/avahi-wrap.h>
 #include <pulsecore/dynarray.h>
 #include <pulsecore/modargs.h>
 #include <pulsecore/avahi-wrap.h>
-#include <pulsecore/endianmacros.h>
+#include <pulsecore/protocol-native.h>
 
 #include "module-zeroconf-publish-symdef.h"
 
 PA_MODULE_AUTHOR("Lennart Poettering");
 PA_MODULE_DESCRIPTION("mDNS/DNS-SD Service Publisher");
 PA_MODULE_VERSION(PACKAGE_VERSION);
 
 #include "module-zeroconf-publish-symdef.h"
 
 PA_MODULE_AUTHOR("Lennart Poettering");
 PA_MODULE_DESCRIPTION("mDNS/DNS-SD Service Publisher");
 PA_MODULE_VERSION(PACKAGE_VERSION);
-PA_MODULE_LOAD_ONCE(TRUE);
-PA_MODULE_USAGE("port=<IP port number>");
+PA_MODULE_LOAD_ONCE(true);
 
 #define SERVICE_TYPE_SINK "_pulse-sink._tcp"
 #define SERVICE_TYPE_SOURCE "_pulse-source._tcp"
 
 #define SERVICE_TYPE_SINK "_pulse-sink._tcp"
 #define SERVICE_TYPE_SOURCE "_pulse-source._tcp"
@@ -68,11 +65,39 @@ PA_MODULE_USAGE("port=<IP port number>");
 #define SERVICE_SUBTYPE_SOURCE_MONITOR "_monitor._sub."SERVICE_TYPE_SOURCE
 #define SERVICE_SUBTYPE_SOURCE_NON_MONITOR "_non-monitor._sub."SERVICE_TYPE_SOURCE
 
 #define SERVICE_SUBTYPE_SOURCE_MONITOR "_monitor._sub."SERVICE_TYPE_SOURCE
 #define SERVICE_SUBTYPE_SOURCE_NON_MONITOR "_non-monitor._sub."SERVICE_TYPE_SOURCE
 
+/*
+ * Note: Because the core avahi-client calls result in synchronous D-Bus
+ * communication, calling any of those functions in the PA mainloop context
+ * could lead to the mainloop being blocked for long periods.
+ *
+ * To avoid this, we create a threaded-mainloop for Avahi calls, and push all
+ * D-Bus communication into that thread. The thumb-rule for the split is:
+ *
+ * 1. If access to PA data structures is needed, use the PA mainloop context
+ *
+ * 2. If a (blocking) avahi-client call is needed, use the Avahi mainloop
+ *
+ * We do have message queue to pass messages from the Avahi mainloop to the PA
+ * mainloop.
+ */
+
 static const char* const valid_modargs[] = {
 static const char* const valid_modargs[] = {
-    "port",
     NULL
 };
 
     NULL
 };
 
+struct avahi_msg {
+    pa_msgobject parent;
+};
+
+typedef struct avahi_msg avahi_msg;
+PA_DEFINE_PRIVATE_CLASS(avahi_msg, pa_msgobject);
+
+enum {
+    AVAHI_MESSAGE_PUBLISH_ALL,
+    AVAHI_MESSAGE_SHUTDOWN_START,
+    AVAHI_MESSAGE_SHUTDOWN_COMPLETE,
+};
+
 enum service_subtype {
     SUBTYPE_HARDWARE,
     SUBTYPE_VIRTUAL,
 enum service_subtype {
     SUBTYPE_HARDWARE,
     SUBTYPE_VIRTUAL,
@@ -80,72 +105,107 @@ enum service_subtype {
 };
 
 struct service {
 };
 
 struct service {
+    void *key;
+
     struct userdata *userdata;
     AvahiEntryGroup *entry_group;
     char *service_name;
     struct userdata *userdata;
     AvahiEntryGroup *entry_group;
     char *service_name;
-    pa_object *device;
+    const char *service_type;
     enum service_subtype subtype;
     enum service_subtype subtype;
+
+    char *name;
+    bool is_sink;
+    pa_sample_spec ss;
+    pa_channel_map cm;
+    pa_proplist *proplist;
 };
 
 struct userdata {
 };
 
 struct userdata {
+    pa_thread_mq thread_mq;
+    pa_rtpoll *rtpoll;
+    avahi_msg *msg;
+
     pa_core *core;
     pa_module *module;
     pa_core *core;
     pa_module *module;
+    pa_mainloop_api *api;
+    pa_threaded_mainloop *mainloop;
+
     AvahiPoll *avahi_poll;
     AvahiClient *client;
 
     AvahiPoll *avahi_poll;
     AvahiClient *client;
 
-    pa_hashmap *services;
+    pa_hashmap *services; /* protect with mainloop lock */
     char *service_name;
 
     AvahiEntryGroup *main_entry_group;
 
     char *service_name;
 
     AvahiEntryGroup *main_entry_group;
 
-    uint16_t port;
-
     pa_hook_slot *sink_new_slot, *source_new_slot, *sink_unlink_slot, *source_unlink_slot, *sink_changed_slot, *source_changed_slot;
     pa_hook_slot *sink_new_slot, *source_new_slot, *sink_unlink_slot, *source_unlink_slot, *sink_changed_slot, *source_changed_slot;
-};
-
-static void get_service_data(struct service *s, pa_sample_spec *ret_ss, pa_channel_map *ret_map, const char **ret_name, const char **ret_description, enum service_subtype *ret_subtype) {
-    pa_assert(s);
-    pa_assert(ret_ss);
-    pa_assert(ret_description);
-    pa_assert(ret_subtype);
 
 
-    if (pa_sink_isinstance(s->device)) {
-        pa_sink *sink = PA_SINK(s->device);
+    pa_native_protocol *native;
 
 
-        *ret_ss = sink->sample_spec;
-        *ret_map = sink->channel_map;
-        *ret_name = sink->name;
-        *ret_description = pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION));
-        *ret_subtype = sink->flags & PA_SINK_HARDWARE ? SUBTYPE_HARDWARE : SUBTYPE_VIRTUAL;
+    bool shutting_down;
+};
 
 
-    } else if (pa_source_isinstance(s->device)) {
-        pa_source *source = PA_SOURCE(s->device);
+/* Runs in PA mainloop context */
+static void get_service_data(struct service *s, pa_object *device) {
+    pa_assert(s);
 
 
-        *ret_ss = source->sample_spec;
-        *ret_map = source->channel_map;
-        *ret_name = source->name;
-        *ret_description = pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION));
-        *ret_subtype = source->monitor_of ? SUBTYPE_MONITOR : (source->flags & PA_SOURCE_HARDWARE ? SUBTYPE_HARDWARE : SUBTYPE_VIRTUAL);
+    if (pa_sink_isinstance(device)) {
+        pa_sink *sink = PA_SINK(device);
+
+        s->is_sink = true;
+        s->service_type = SERVICE_TYPE_SINK;
+        s->ss = sink->sample_spec;
+        s->cm = sink->channel_map;
+        s->name = pa_xstrdup(sink->name);
+        s->proplist = pa_proplist_copy(sink->proplist);
+        s->subtype = sink->flags & PA_SINK_HARDWARE ? SUBTYPE_HARDWARE : SUBTYPE_VIRTUAL;
+
+    } else if (pa_source_isinstance(device)) {
+        pa_source *source = PA_SOURCE(device);
+
+        s->is_sink = false;
+        s->service_type = SERVICE_TYPE_SOURCE;
+        s->ss = source->sample_spec;
+        s->cm = source->channel_map;
+        s->name = pa_xstrdup(source->name);
+        s->proplist = pa_proplist_copy(source->proplist);
+        s->subtype = source->monitor_of ? SUBTYPE_MONITOR : (source->flags & PA_SOURCE_HARDWARE ? SUBTYPE_HARDWARE : SUBTYPE_VIRTUAL);
 
     } else
         pa_assert_not_reached();
 }
 
 
     } else
         pa_assert_not_reached();
 }
 
+/* Can be used in either PA or Avahi mainloop context since the bits of u->core
+ * that we access don't change after startup. */
 static AvahiStringList* txt_record_server_data(pa_core *c, AvahiStringList *l) {
     char s[128];
 static AvahiStringList* txt_record_server_data(pa_core *c, AvahiStringList *l) {
     char s[128];
+    char *t;
 
     pa_assert(c);
 
     l = avahi_string_list_add_pair(l, "server-version", PACKAGE_NAME" "PACKAGE_VERSION);
 
     pa_assert(c);
 
     l = avahi_string_list_add_pair(l, "server-version", PACKAGE_NAME" "PACKAGE_VERSION);
-    l = avahi_string_list_add_pair(l, "user-name", pa_get_user_name(s, sizeof(s)));
+
+    t = pa_get_user_name_malloc();
+    l = avahi_string_list_add_pair(l, "user-name", t);
+    pa_xfree(t);
+
+    t = pa_machine_id();
+    l = avahi_string_list_add_pair(l, "machine-id", t);
+    pa_xfree(t);
+
+    t = pa_uname_string();
+    l = avahi_string_list_add_pair(l, "uname", t);
+    pa_xfree(t);
+
     l = avahi_string_list_add_pair(l, "fqdn", pa_get_fqdn(s, sizeof(s)));
     l = avahi_string_list_add_printf(l, "cookie=0x%08x", c->cookie);
 
     return l;
 }
 
     l = avahi_string_list_add_pair(l, "fqdn", pa_get_fqdn(s, sizeof(s)));
     l = avahi_string_list_add_printf(l, "cookie=0x%08x", c->cookie);
 
     return l;
 }
 
-static int publish_service(struct service *s);
+static void publish_service(pa_mainloop_api *api, void *service);
 
 
+/* Runs in Avahi mainloop context */
 static void service_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) {
     struct service *s = userdata;
 
 static void service_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) {
     struct service *s = userdata;
 
@@ -165,7 +225,7 @@ static void service_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupStat
             pa_xfree(s->service_name);
             s->service_name = t;
 
             pa_xfree(s->service_name);
             s->service_name = t;
 
-            publish_service(s);
+            publish_service(NULL, s);
             break;
         }
 
             break;
         }
 
@@ -186,14 +246,38 @@ static void service_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupStat
 
 static void service_free(struct service *s);
 
 
 static void service_free(struct service *s);
 
-static int publish_service(struct service *s) {
+/* Can run in either context */
+static uint16_t compute_port(struct userdata *u) {
+    pa_strlist *i;
+
+    pa_assert(u);
+
+    for (i = pa_native_protocol_servers(u->native); i; i = pa_strlist_next(i)) {
+        pa_parsed_address a;
+
+        if (pa_parse_address(pa_strlist_data(i), &a) >= 0 &&
+            (a.type == PA_PARSED_ADDRESS_TCP4 ||
+             a.type == PA_PARSED_ADDRESS_TCP6 ||
+             a.type == PA_PARSED_ADDRESS_TCP_AUTO) &&
+            a.port > 0) {
+
+            pa_xfree(a.path_or_host);
+            return a.port;
+        }
+
+        pa_xfree(a.path_or_host);
+    }
+
+    return PA_NATIVE_DEFAULT_PORT;
+}
+
+/* Runs in Avahi mainloop context */
+static void publish_service(pa_mainloop_api *api PA_GCC_UNUSED, void *service) {
+    struct service *s = (struct service *) service;
     int r = -1;
     AvahiStringList *txt = NULL;
     int r = -1;
     AvahiStringList *txt = NULL;
-    const char *description = NULL, *name = NULL;
-    pa_sample_spec ss;
-    pa_channel_map map;
     char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
     char cm[PA_CHANNEL_MAP_SNPRINT_MAX];
-    enum service_subtype subtype;
+    const char *t;
 
     const char * const subtype_text[] = {
         [SUBTYPE_HARDWARE] = "hardware",
 
     const char * const subtype_text[] = {
         [SUBTYPE_HARDWARE] = "hardware",
@@ -204,7 +288,7 @@ static int publish_service(struct service *s) {
     pa_assert(s);
 
     if (!s->userdata->client || avahi_client_get_state(s->userdata->client) != AVAHI_CLIENT_S_RUNNING)
     pa_assert(s);
 
     if (!s->userdata->client || avahi_client_get_state(s->userdata->client) != AVAHI_CLIENT_S_RUNNING)
-        return 0;
+        return;
 
     if (!s->entry_group) {
         if (!(s->entry_group = avahi_entry_group_new(s->userdata->client, service_entry_group_callback, s))) {
 
     if (!s->entry_group) {
         if (!(s->entry_group = avahi_entry_group_new(s->userdata->client, service_entry_group_callback, s))) {
@@ -216,23 +300,35 @@ static int publish_service(struct service *s) {
 
     txt = txt_record_server_data(s->userdata->core, txt);
 
 
     txt = txt_record_server_data(s->userdata->core, txt);
 
-    get_service_data(s, &ss, &map, &name, &description, &subtype);
-    txt = avahi_string_list_add_pair(txt, "device", name);
-    txt = avahi_string_list_add_printf(txt, "rate=%u", ss.rate);
-    txt = avahi_string_list_add_printf(txt, "channels=%u", ss.channels);
-    txt = avahi_string_list_add_pair(txt, "format", pa_sample_format_to_string(ss.format));
-    txt = avahi_string_list_add_pair(txt, "channel_map", pa_channel_map_snprint(cm, sizeof(cm), &map));
-    txt = avahi_string_list_add_pair(txt, "subtype", subtype_text[subtype]);
+    txt = avahi_string_list_add_pair(txt, "device", s->name);
+    txt = avahi_string_list_add_printf(txt, "rate=%u", s->ss.rate);
+    txt = avahi_string_list_add_printf(txt, "channels=%u", s->ss.channels);
+    txt = avahi_string_list_add_pair(txt, "format", pa_sample_format_to_string(s->ss.format));
+    txt = avahi_string_list_add_pair(txt, "channel_map", pa_channel_map_snprint(cm, sizeof(cm), &s->cm));
+    txt = avahi_string_list_add_pair(txt, "subtype", subtype_text[s->subtype]);
+
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION)))
+        txt = avahi_string_list_add_pair(txt, "description", t);
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_ICON_NAME)))
+        txt = avahi_string_list_add_pair(txt, "icon-name", t);
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_VENDOR_NAME)))
+        txt = avahi_string_list_add_pair(txt, "vendor-name", t);
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_PRODUCT_NAME)))
+        txt = avahi_string_list_add_pair(txt, "product-name", t);
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_CLASS)))
+        txt = avahi_string_list_add_pair(txt, "class", t);
+    if ((t = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_FORM_FACTOR)))
+        txt = avahi_string_list_add_pair(txt, "form-factor", t);
 
     if (avahi_entry_group_add_service_strlst(
                 s->entry_group,
                 AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
                 0,
                 s->service_name,
 
     if (avahi_entry_group_add_service_strlst(
                 s->entry_group,
                 AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
                 0,
                 s->service_name,
-                pa_sink_isinstance(s->device) ? SERVICE_TYPE_SINK : SERVICE_TYPE_SOURCE,
+                s->service_type,
                 NULL,
                 NULL,
                 NULL,
                 NULL,
-                s->userdata->port,
+                compute_port(s->userdata),
                 txt) < 0) {
 
         pa_log("avahi_entry_group_add_service_strlst(): %s", avahi_strerror(avahi_client_errno(s->userdata->client)));
                 txt) < 0) {
 
         pa_log("avahi_entry_group_add_service_strlst(): %s", avahi_strerror(avahi_client_errno(s->userdata->client)));
@@ -244,16 +340,16 @@ static int publish_service(struct service *s) {
                 AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
                 0,
                 s->service_name,
                 AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
                 0,
                 s->service_name,
-                pa_sink_isinstance(s->device) ? SERVICE_TYPE_SINK : SERVICE_TYPE_SOURCE,
+                s->service_type,
                 NULL,
                 NULL,
-                pa_sink_isinstance(s->device) ? (subtype == SUBTYPE_HARDWARE ? SERVICE_SUBTYPE_SINK_HARDWARE : SERVICE_SUBTYPE_SINK_VIRTUAL) :
-                (subtype == SUBTYPE_HARDWARE ? SERVICE_SUBTYPE_SOURCE_HARDWARE : (subtype == SUBTYPE_VIRTUAL ? SERVICE_SUBTYPE_SOURCE_VIRTUAL : SERVICE_SUBTYPE_SOURCE_MONITOR))) < 0) {
+                s->is_sink ? (s->subtype == SUBTYPE_HARDWARE ? SERVICE_SUBTYPE_SINK_HARDWARE : SERVICE_SUBTYPE_SINK_VIRTUAL) :
+                (s->subtype == SUBTYPE_HARDWARE ? SERVICE_SUBTYPE_SOURCE_HARDWARE : (s->subtype == SUBTYPE_VIRTUAL ? SERVICE_SUBTYPE_SOURCE_VIRTUAL : SERVICE_SUBTYPE_SOURCE_MONITOR))) < 0) {
 
         pa_log("avahi_entry_group_add_service_subtype(): %s", avahi_strerror(avahi_client_errno(s->userdata->client)));
         goto finish;
     }
 
 
         pa_log("avahi_entry_group_add_service_subtype(): %s", avahi_strerror(avahi_client_errno(s->userdata->client)));
         goto finish;
     }
 
-    if (pa_source_isinstance(s->device) && subtype != SUBTYPE_MONITOR) {
+    if (!s->is_sink && s->subtype != SUBTYPE_MONITOR) {
         if (avahi_entry_group_add_service_subtype(
                     s->entry_group,
                     AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
         if (avahi_entry_group_add_service_subtype(
                     s->entry_group,
                     AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
@@ -279,64 +375,73 @@ static int publish_service(struct service *s) {
 finish:
 
     /* Remove this service */
 finish:
 
     /* Remove this service */
-    if (r < 0)
+    if (r < 0) {
+        pa_hashmap_remove(s->userdata->services, s->key);
         service_free(s);
         service_free(s);
+    }
 
     avahi_string_list_free(txt);
 
     avahi_string_list_free(txt);
-
-    return r;
 }
 
 }
 
+/* Runs in PA mainloop context */
 static struct service *get_service(struct userdata *u, pa_object *device) {
     struct service *s;
 static struct service *get_service(struct userdata *u, pa_object *device) {
     struct service *s;
-    char hn[64], un[64];
+    char *hn, *un;
     const char *n;
 
     pa_assert(u);
     pa_object_assert_ref(device);
 
     const char *n;
 
     pa_assert(u);
     pa_object_assert_ref(device);
 
+    pa_threaded_mainloop_lock(u->mainloop);
+
     if ((s = pa_hashmap_get(u->services, device)))
     if ((s = pa_hashmap_get(u->services, device)))
-        return s;
+        goto out;
 
     s = pa_xnew(struct service, 1);
 
     s = pa_xnew(struct service, 1);
+    s->key = device;
     s->userdata = u;
     s->entry_group = NULL;
     s->userdata = u;
     s->entry_group = NULL;
-    s->device = device;
 
 
-    if (pa_sink_isinstance(device)) {
-        if (!(n = pa_proplist_gets(PA_SINK(device)->proplist, PA_PROP_DEVICE_DESCRIPTION)))
-            n = PA_SINK(device)->name;
-    } else {
-        if (!(n = pa_proplist_gets(PA_SOURCE(device)->proplist, PA_PROP_DEVICE_DESCRIPTION)))
-            n = PA_SOURCE(device)->name;
-    }
+    get_service_data(s, device);
+
+    if (!(n = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION)))
+        n = s->name;
 
 
-    s->service_name = pa_truncate_utf8(pa_sprintf_malloc("%s@%s: %s",
-                                                         pa_get_user_name(un, sizeof(un)),
-                                                         pa_get_host_name(hn, sizeof(hn)),
-                                                         n),
-                                       AVAHI_LABEL_MAX-1);
+    hn = pa_get_host_name_malloc();
+    un = pa_get_user_name_malloc();
 
 
-    pa_hashmap_put(u->services, s->device, s);
+    s->service_name = pa_truncate_utf8(pa_sprintf_malloc("%s@%s: %s", un, hn, n), AVAHI_LABEL_MAX-1);
+
+    pa_xfree(un);
+    pa_xfree(hn);
+
+    pa_hashmap_put(u->services, device, s);
+
+out:
+    pa_threaded_mainloop_unlock(u->mainloop);
 
     return s;
 }
 
 
     return s;
 }
 
+/* Run from Avahi mainloop context */
 static void service_free(struct service *s) {
     pa_assert(s);
 
 static void service_free(struct service *s) {
     pa_assert(s);
 
-    pa_hashmap_remove(s->userdata->services, s->device);
-
     if (s->entry_group) {
         pa_log_debug("Removing entry group for %s.", s->service_name);
         avahi_entry_group_free(s->entry_group);
     }
 
     pa_xfree(s->service_name);
     if (s->entry_group) {
         pa_log_debug("Removing entry group for %s.", s->service_name);
         avahi_entry_group_free(s->entry_group);
     }
 
     pa_xfree(s->service_name);
+
+    pa_xfree(s->name);
+    pa_proplist_free(s->proplist);
+
     pa_xfree(s);
 }
 
     pa_xfree(s);
 }
 
-static pa_bool_t shall_ignore(pa_object *o) {
+/* Runs in PA mainloop context */
+static bool shall_ignore(pa_object *o) {
     pa_object_assert_ref(o);
 
     if (pa_sink_isinstance(o))
     pa_object_assert_ref(o);
 
     if (pa_sink_isinstance(o))
@@ -348,30 +453,40 @@ static pa_bool_t shall_ignore(pa_object *o) {
     pa_assert_not_reached();
 }
 
     pa_assert_not_reached();
 }
 
+/* Runs in PA mainloop context */
 static pa_hook_result_t device_new_or_changed_cb(pa_core *c, pa_object *o, struct userdata *u) {
     pa_assert(c);
     pa_object_assert_ref(o);
 
 static pa_hook_result_t device_new_or_changed_cb(pa_core *c, pa_object *o, struct userdata *u) {
     pa_assert(c);
     pa_object_assert_ref(o);
 
-    if (!shall_ignore(o))
-        publish_service(get_service(u, o));
+    if (!shall_ignore(o)) {
+        pa_threaded_mainloop_lock(u->mainloop);
+        pa_mainloop_api_once(u->api, publish_service, get_service(u, o));
+        pa_threaded_mainloop_unlock(u->mainloop);
+    }
 
     return PA_HOOK_OK;
 }
 
 
     return PA_HOOK_OK;
 }
 
+/* Runs in PA mainloop context */
 static pa_hook_result_t device_unlink_cb(pa_core *c, pa_object *o, struct userdata *u) {
     struct service *s;
 
     pa_assert(c);
     pa_object_assert_ref(o);
 
 static pa_hook_result_t device_unlink_cb(pa_core *c, pa_object *o, struct userdata *u) {
     struct service *s;
 
     pa_assert(c);
     pa_object_assert_ref(o);
 
-    if ((s = pa_hashmap_get(u->services, o)))
+    pa_threaded_mainloop_lock(u->mainloop);
+
+    if ((s = pa_hashmap_remove(u->services, o)))
         service_free(s);
 
         service_free(s);
 
+    pa_threaded_mainloop_unlock(u->mainloop);
+
     return PA_HOOK_OK;
 }
 
 static int publish_main_service(struct userdata *u);
 
     return PA_HOOK_OK;
 }
 
 static int publish_main_service(struct userdata *u);
 
+/* Runs in Avahi mainloop context */
 static void main_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) {
     struct userdata *u = userdata;
     pa_assert(u);
 static void main_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) {
     struct userdata *u = userdata;
     pa_assert(u);
@@ -408,6 +523,7 @@ static void main_entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState s
     }
 }
 
     }
 }
 
+/* Runs in Avahi mainloop context */
 static int publish_main_service(struct userdata *u) {
     AvahiStringList *txt = NULL;
     int r = -1;
 static int publish_main_service(struct userdata *u) {
     AvahiStringList *txt = NULL;
     int r = -1;
@@ -432,7 +548,7 @@ static int publish_main_service(struct userdata *u) {
                 SERVICE_TYPE_SERVER,
                 NULL,
                 NULL,
                 SERVICE_TYPE_SERVER,
                 NULL,
                 NULL,
-                u->port,
+                compute_port(u),
                 txt) < 0) {
 
         pa_log("avahi_entry_group_add_service_strlst() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
                 txt) < 0) {
 
         pa_log("avahi_entry_group_add_service_strlst() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
@@ -452,6 +568,7 @@ fail:
     return r;
 }
 
     return r;
 }
 
+/* Runs in PA mainloop context */
 static int publish_all_services(struct userdata *u) {
     pa_sink *sink;
     pa_source *source;
 static int publish_all_services(struct userdata *u) {
     pa_sink *sink;
     pa_source *source;
@@ -463,12 +580,18 @@ static int publish_all_services(struct userdata *u) {
     pa_log_debug("Publishing services in Zeroconf");
 
     for (sink = PA_SINK(pa_idxset_first(u->core->sinks, &idx)); sink; sink = PA_SINK(pa_idxset_next(u->core->sinks, &idx)))
     pa_log_debug("Publishing services in Zeroconf");
 
     for (sink = PA_SINK(pa_idxset_first(u->core->sinks, &idx)); sink; sink = PA_SINK(pa_idxset_next(u->core->sinks, &idx)))
-        if (!shall_ignore(PA_OBJECT(sink)))
-            publish_service(get_service(u, PA_OBJECT(sink)));
+        if (!shall_ignore(PA_OBJECT(sink))) {
+            pa_threaded_mainloop_lock(u->mainloop);
+            pa_mainloop_api_once(u->api, publish_service, get_service(u, PA_OBJECT(sink)));
+            pa_threaded_mainloop_unlock(u->mainloop);
+        }
 
     for (source = PA_SOURCE(pa_idxset_first(u->core->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(u->core->sources, &idx)))
 
     for (source = PA_SOURCE(pa_idxset_first(u->core->sources, &idx)); source; source = PA_SOURCE(pa_idxset_next(u->core->sources, &idx)))
-        if (!shall_ignore(PA_OBJECT(source)))
-            publish_service(get_service(u, PA_OBJECT(source)));
+        if (!shall_ignore(PA_OBJECT(source))) {
+            pa_threaded_mainloop_lock(u->mainloop);
+            pa_mainloop_api_once(u->api, publish_service, get_service(u, PA_OBJECT(source)));
+            pa_threaded_mainloop_unlock(u->mainloop);
+        }
 
     if (publish_main_service(u) < 0)
         goto fail;
 
     if (publish_main_service(u) < 0)
         goto fail;
@@ -479,7 +602,8 @@ fail:
     return r;
 }
 
     return r;
 }
 
-static void unpublish_all_services(struct userdata *u, pa_bool_t rem) {
+/* Runs in Avahi mainloop context */
+static void unpublish_all_services(struct userdata *u, bool rem) {
     void *state = NULL;
     struct service *s;
 
     void *state = NULL;
     struct service *s;
 
@@ -512,6 +636,32 @@ static void unpublish_all_services(struct userdata *u, pa_bool_t rem) {
     }
 }
 
     }
 }
 
+/* Runs in PA mainloop context */
+static int avahi_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+    struct userdata *u = (struct userdata *) data;
+
+    pa_assert(u);
+
+    if (u->shutting_down)
+        return 0;
+
+    switch (code) {
+        case AVAHI_MESSAGE_PUBLISH_ALL:
+            publish_all_services(u);
+            break;
+
+        case AVAHI_MESSAGE_SHUTDOWN_START:
+            pa_module_unload(u->core, u->module, true);
+            break;
+
+        default:
+            pa_assert_not_reached();
+    }
+
+    return 0;
+}
+
+/* Runs in Avahi mainloop context */
 static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
     struct userdata *u = userdata;
 
 static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
     struct userdata *u = userdata;
 
@@ -522,12 +672,13 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
 
     switch (state) {
         case AVAHI_CLIENT_S_RUNNING:
 
     switch (state) {
         case AVAHI_CLIENT_S_RUNNING:
-            publish_all_services(u);
+            /* Collect all sinks/sources, and publish them */
+            pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), AVAHI_MESSAGE_PUBLISH_ALL, u, 0, NULL, NULL);
             break;
 
         case AVAHI_CLIENT_S_COLLISION:
             pa_log_debug("Host name collision");
             break;
 
         case AVAHI_CLIENT_S_COLLISION:
             pa_log_debug("Host name collision");
-            unpublish_all_services(u, FALSE);
+            unpublish_all_services(u, false);
             break;
 
         case AVAHI_CLIENT_FAILURE:
             break;
 
         case AVAHI_CLIENT_FAILURE:
@@ -536,12 +687,12 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
 
                 pa_log_debug("Avahi daemon disconnected.");
 
 
                 pa_log_debug("Avahi daemon disconnected.");
 
-                unpublish_all_services(u, TRUE);
+                unpublish_all_services(u, true);
                 avahi_client_free(u->client);
 
                 if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
                     pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
                 avahi_client_free(u->client);
 
                 if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
                     pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
-                    pa_module_unload_request(u->module);
+                    pa_module_unload_request(u->module, true);
                 }
             }
 
                 }
             }
 
@@ -551,48 +702,75 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
     }
 }
 
     }
 }
 
+/* Runs in Avahi mainloop context */
+static void create_client(pa_mainloop_api *api PA_GCC_UNUSED, void *userdata) {
+    struct userdata *u = (struct userdata *) userdata;
+    int error;
+
+    pa_thread_mq_install(&u->thread_mq);
+
+    if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
+        pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
+        goto fail;
+    }
+
+    pa_log_debug("Started Avahi threaded mainloop");
+
+    return;
+
+fail:
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), AVAHI_MESSAGE_SHUTDOWN_START, u, 0, NULL, NULL);
+}
+
 int pa__init(pa_module*m) {
 
     struct userdata *u;
 int pa__init(pa_module*m) {
 
     struct userdata *u;
-    uint32_t port = PA_NATIVE_DEFAULT_PORT;
     pa_modargs *ma = NULL;
     pa_modargs *ma = NULL;
-    char hn[256], un[256];
-    int error;
+    char *hn, *un;
 
     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
         pa_log("Failed to parse module arguments.");
         goto fail;
     }
 
 
     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
         pa_log("Failed to parse module arguments.");
         goto fail;
     }
 
-    if (pa_modargs_get_value_u32(ma, "port", &port) < 0 || port <= 0 || port > 0xFFFF) {
-        pa_log("Invalid port specified.");
-        goto fail;
-    }
-
     m->userdata = u = pa_xnew(struct userdata, 1);
     u->core = m->core;
     u->module = m;
     m->userdata = u = pa_xnew(struct userdata, 1);
     u->core = m->core;
     u->module = m;
-    u->port = (uint16_t) port;
+    u->native = pa_native_protocol_get(u->core);
+
+    u->rtpoll = pa_rtpoll_new();
+    u->mainloop = pa_threaded_mainloop_new();
+    u->api = pa_threaded_mainloop_get_api(u->mainloop);
 
 
-    u->avahi_poll = pa_avahi_poll_new(m->core->mainloop);
+    pa_thread_mq_init(&u->thread_mq, u->core->mainloop, u->rtpoll);
+    u->msg = pa_msgobject_new(avahi_msg);
+    u->msg->parent.process_msg = avahi_process_msg;
 
 
-    u->services = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
+    u->avahi_poll = pa_avahi_poll_new(u->api);
 
 
-    u->sink_new_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PUT], (pa_hook_cb_t) device_new_or_changed_cb, u);
-    u->sink_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PROPLIST_CHANGED], (pa_hook_cb_t) device_new_or_changed_cb, u);
-    u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], (pa_hook_cb_t) device_unlink_cb, u);
-    u->source_new_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_PUT], (pa_hook_cb_t) device_new_or_changed_cb, u);
-    u->source_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], (pa_hook_cb_t) device_new_or_changed_cb, u);
-    u->source_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], (pa_hook_cb_t) device_unlink_cb, u);
+    u->services = pa_hashmap_new_full(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func, NULL, (pa_free_cb_t) service_free);
+
+    u->sink_new_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PUT], PA_HOOK_LATE, (pa_hook_cb_t) device_new_or_changed_cb, u);
+    u->sink_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_PROPLIST_CHANGED], PA_HOOK_LATE, (pa_hook_cb_t) device_new_or_changed_cb, u);
+    u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_LATE, (pa_hook_cb_t) device_unlink_cb, u);
+    u->source_new_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_PUT], PA_HOOK_LATE, (pa_hook_cb_t) device_new_or_changed_cb, u);
+    u->source_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_PROPLIST_CHANGED], PA_HOOK_LATE, (pa_hook_cb_t) device_new_or_changed_cb, u);
+    u->source_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_UNLINK], PA_HOOK_LATE, (pa_hook_cb_t) device_unlink_cb, u);
 
     u->main_entry_group = NULL;
 
 
     u->main_entry_group = NULL;
 
-    u->service_name = pa_truncate_utf8(pa_sprintf_malloc("%s@%s", pa_get_user_name(un, sizeof(un)), pa_get_host_name(hn, sizeof(hn))), AVAHI_LABEL_MAX);
+    un = pa_get_user_name_malloc();
+    hn = pa_get_host_name_malloc();
+    u->service_name = pa_truncate_utf8(pa_sprintf_malloc("%s@%s", un, hn), AVAHI_LABEL_MAX-1);
+    pa_xfree(un);
+    pa_xfree(hn);
 
 
-    if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
-        pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
-        goto fail;
-    }
+    pa_threaded_mainloop_set_name(u->mainloop, "avahi-ml");
+    pa_threaded_mainloop_start(u->mainloop);
+
+    pa_threaded_mainloop_lock(u->mainloop);
+    pa_mainloop_api_once(u->api, create_client, u);
+    pa_threaded_mainloop_unlock(u->mainloop);
 
     pa_modargs_free(ma);
 
 
     pa_modargs_free(ma);
 
@@ -607,6 +785,24 @@ fail:
     return -1;
 }
 
     return -1;
 }
 
+/* Runs in Avahi mainloop context */
+static void client_free(pa_mainloop_api *api PA_GCC_UNUSED, void *userdata) {
+    struct userdata *u = (struct userdata *) userdata;
+
+    pa_hashmap_free(u->services);
+
+    if (u->main_entry_group)
+        avahi_entry_group_free(u->main_entry_group);
+
+    if (u->client)
+        avahi_client_free(u->client);
+
+    if (u->avahi_poll)
+        pa_avahi_poll_free(u->avahi_poll);
+
+    pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->msg), AVAHI_MESSAGE_SHUTDOWN_COMPLETE, u, 0, NULL, NULL);
+}
+
 void pa__done(pa_module*m) {
     struct userdata*u;
     pa_assert(m);
 void pa__done(pa_module*m) {
     struct userdata*u;
     pa_assert(m);
@@ -614,14 +810,18 @@ void pa__done(pa_module*m) {
     if (!(u = m->userdata))
         return;
 
     if (!(u = m->userdata))
         return;
 
-    if (u->services) {
-        struct service *s;
+    u->shutting_down = true;
 
 
-        while ((s = pa_hashmap_get_first(u->services)))
-            service_free(s);
+    pa_threaded_mainloop_lock(u->mainloop);
+    pa_mainloop_api_once(u->api, client_free, u);
+    pa_threaded_mainloop_unlock(u->mainloop);
+    pa_asyncmsgq_wait_for(u->thread_mq.outq, AVAHI_MESSAGE_SHUTDOWN_COMPLETE);
 
 
-        pa_hashmap_free(u->services, NULL, NULL);
-    }
+    pa_threaded_mainloop_stop(u->mainloop);
+    pa_threaded_mainloop_free(u->mainloop);
+
+    pa_thread_mq_done(&u->thread_mq);
+    pa_rtpoll_free(u->rtpoll);
 
     if (u->sink_new_slot)
         pa_hook_slot_free(u->sink_new_slot);
 
     if (u->sink_new_slot)
         pa_hook_slot_free(u->sink_new_slot);
@@ -636,15 +836,10 @@ void pa__done(pa_module*m) {
     if (u->source_unlink_slot)
         pa_hook_slot_free(u->source_unlink_slot);
 
     if (u->source_unlink_slot)
         pa_hook_slot_free(u->source_unlink_slot);
 
-    if (u->main_entry_group)
-        avahi_entry_group_free(u->main_entry_group);
-
-    if (u->client)
-        avahi_client_free(u->client);
-
-    if (u->avahi_poll)
-        pa_avahi_poll_free(u->avahi_poll);
+    if (u->native)
+        pa_native_protocol_unref(u->native);
 
 
+    pa_xfree(u->msg);
     pa_xfree(u->service_name);
     pa_xfree(u);
 }
     pa_xfree(u->service_name);
     pa_xfree(u);
 }