import { as_func, p_then } from './builtins.jsy' import { ao_defer_v, ao_push_stream } from './timeouts.jsy' const xpkt_body = pkt => pkt.body export function bindCoreDispatchTarget(tgt_router, targets_map, id_route, router) :: // as closures over private variables (targets_map) return @{} ready: _add_target_route(id_route) async function target_route(pkt, pktctx) :: let id_target = pkt[1] || '' let target = targets_map.get(id_target) if undefined === target :: if pkt.bcast :: return // don't discover/warn on broadcast packet await tgt_router.discoverTarget(id_target, pktctx) target = targets_map.get(id_target) if undefined === target :: let channel = pktctx.channel return channel && channel.undeliverable(pkt, 'target') // do not await _target_dispatch so handlers do not block other messages _target_dispatch(target, pkt, pktctx) return true async function _target_dispatch(target, pkt, pktctx) :: // No errors may pass -- send them to tgt_router.on_dispatch_error try :: pktctx.tgt_router = tgt_router // Trigger on_sent for internal hub routing p_then(pkt.on_sent, true) await target(pkt, pktctx) catch err :: tgt_router.on_dispatch_error @ err, {pkt, pktctx} finally :: pktctx.tgt_router = undefined // release tgt_router referenece async function _add_target_route(id_route) :: if 'string' !== typeof id_route :: id_route = await id_route tgt_router.id_route = target_route.id_route = id_route router.addRoute @ id_route, target_route, true return true export function bindCoreTargetAPI(tgt_router, targets_map, id_route, router) :: let as_id_target = id => id.trim ? id : id[1] let expire_fin = 'function' !== typeof FinalizationRegistry ? null : new FinalizationRegistry @\ id_target :: targets_map.delete(id_target) return @{} addTarget hasTarget: id => targets_map.has @ as_id_target(id) getTarget: id => targets_map.get @ as_id_target(id) removeTarget: id => targets_map.delete @ as_id_target(id) expireWith addDefer_v xstream ... bindCoreDispatchTarget @ tgt_router, targets_map, id_route, router function addTarget(id_target, target) :: let id = @[] id_route, id_target || tgt_router.newTargetId() // .id comaptibility mirroring xstream() return value Object.defineProperty @ id, 'id', {value: id} if null != target :: targets_map.set @ id[1], as_func(target) return id function expireWith(id, ...args) :: if ! expire_fin :: return false id = as_id_target(id) for let obj of args :: expire_fin.register(obj, id) return true function addDefer_v(target=true, inc_pktctx=false, dp=ao_defer_v()) :: let id = addTarget @ null, ! inc_pktctx ? dp[1] // resolve with just the packet : (...zpkt) => dp[1](...zpkt) // resolve with the args array let p = dp[0] tgt_router.removeAfter(id, p) if target :: p = p.then(true === target ? xpkt_body : target) expireWith(id, p) return @{} id, 0: id[0], 1: id[1], promise: p, abort: dp[2] function xstream(id_target, xapi) :: let {stream, when_done, abort, push} = ao_push_stream(true) let id = addTarget(id_target, push) tgt_router.removeAfter(id, when_done) expireWith(id, stream) return @{} __proto__: xapi, id, 0: id[0], 1: id[1], stream, abort, when_done, tgt_router