import { as_func } from './builtins.jsy' import { ao_track_when } from './timeouts.jsy' export function bindCoreDispatchRouter(hub_router, hub, routes_map) :: // to create loopback, shim .dispatch onto a temporary prototype const loopback = ({ dispatch, __proto__: hub_router}).local_channel() const router_ctx = @{} hub, hub_router, resolveRoute, timeouts: hub_router.timeouts, loopback, send: loopback.send, to(id) :: return (...z) => this.send(id, ...z) const pktctx0 = @{} __proto__: router_ctx, // get send_direct() :: return this.channel.send redispatch: dispatch_one undeliverable() :: let {channel, pkt} = this return channel && channel.undeliverable(pkt, 'route') with_reply(id_reply) :: if id_reply :: this.reply = id_reply ? this.to(id_reply) : null this.done = this.with_reply else :: delete this.reply delete this.done return this return @{} dispatch, resolveRoute, loopback, router_ctx, function dispatch(pkt, channel) :: return dispatch_one @ pkt, @{} channel, pkt, __proto__: pktctx0 async function dispatch_one(pkt, pktctx) :: pkt = await pkt if undefined === pkt || null === pkt :: return let id_route = pkt[0] || '' let route = id_route ? routes_map.get(id_route) : (pktctx.channel || {}).peerRoute if undefined === route :: if pkt.bcast :: return // don't discover/warn on broadcast packet try :: route = await resolveRoute(id_route, true, true) if undefined === route :: return pktctx.undeliverable() catch err :: hub_router._on_error @ 'hub_dispatch_one', err, {pkt, pktctx} // No errors may pass -- send all errors to hub_router.on_dispatch_error try :: return await route(pkt, pktctx) catch err :: hub_router.on_dispatch_error @ err, {pkt, pktctx} finally :: pktctx.channel = undefined // release channel referenece function _resolveRoute0(id_route) :: let idx, route = routes_map.get(id_route) // search for shared path routes based on '.' seperator while undefined === route && (-1 !== @ idx = id_route.lastIndexOf('.', idx)) :: route = routes_map.get @ id_route.slice(0, idx--) return route async function resolveRoute(id_route, allowDiscover, allowUpstream) :: let route = _resolveRoute0(id_route) if undefined !== route :: return route if allowDiscover :: await hub_router.discoverRoute(id_route, router_ctx) route = _resolveRoute0(id_route) if undefined !== route :: return route if allowUpstream :: // otherwise send upstream await hub_router.upstreamRoute(id_route) route = routes_map.get(id_route) if undefined !== route :: return route return route export function bindCoreRouterAPI(hub_router, hub, routes_map) :: // Allow individual queries but not enumeration let _use_override = {override: true} let _use_existing = {override: false} let db_when = ao_track_when() return @{} whenRoute: db_when.get addRoute(id_route, route, opt) :: as_func(route) opt = true === opt ? _use_override : opt || _use_existing if ! opt.override && routes_map.has(id_route) :: return routes_map.set @ id_route, route let cancel = @=> this.removeRoute @ id_route, route db_when.define @ id_route, id_route if ! opt.ms_ttl :: return @{} route, cancel else :: let ttl = hub_router.timeouts.ttl(opt.ms_ttl, cancel) ttl.route = false === opt.ttl_reset ? route : route = ttl.with_reset(route) return ttl aliasRoute(id_route, id_alias=id_route.split(/\W\w+$/)[0]) :: let route = as_func(routes_map.get(id_route)) if id_alias :: routes_map.set @ id_alias, route return id_alias removeRoute(id_route, route) :: if 1 === arguments.length :: route = id_route.route || id_route id_route = id_route.id_route || id_route[0] // remove only if route is currently at id_route if null != route && route !== routes_map.get(id_route) :: return false // remove all aliases of route for let [id, fn] of routes_map.entries() :: if route === fn :: routes_map.delete @ id return true getRoute(id_route) :: return routes_map.get(id_route) hasRoute(id_route) :: return routes_map.has(id_route) ... bindCoreDispatchRouter @ hub_router, hub, routes_map