import { o_assign, o_create } from './builtins.jsy' import { bindCoreTargetAPI } from './targets_core.jsy' export class TargetRouter :: constructor(id_route, router, hub) :: let {randId} = hub, {timeouts, send, loopback} = router.router_ctx o_assign @ this, @{} timeouts, send, loopback, randId if this._skip_bind_api :: return this let self = o_create(this) o_assign @ this, bindCoreTargetAPI @ self, this._initTargets(), id_route, router return self _initTargets() :: return new Map() _on_error(scope, err) :: console.error @ 'target', scope, err on_dispatch_error(err) :: this._on_error('tgt_dispatch', err) get _on_stream_error() :: return err => this._on_error('stream', err) newTargetId() :: return this.randId(4) discoverTarget(id_target, pktctx) :: // see plugins/discovery /* // from bindCoreTargetAPI ready : Promise addTarget(id_target, target, opt) : [id_route, id_target] hasTarget(id_target) : Boolean getTarget(id_target) : function(pkt, pktctx) removeTarget(id_target) : Boolean removeTarget([id_route, id_target]) : Boolean addDefer_v(target, inc_pktctx, dp=ao_defer_v()) : {id, promise} xstream(id_target, xapi) : tgt_stream_obj where tgt_stream_obj is {__proto__: xapi, id, stream, abort, when_done, tgt_router} */ addOnce(ms, absent, target=true) :: let xtgt = this.addDefer_v(target, false) // setup timeout.absent to cancel deferred xtgt.promise = this.timeouts.absent @ absent, ms, xtgt.promise // enusre the root defered is resolved or rejected xtgt.promise.then(xtgt.abort) return xtgt addStream(id_target, async_target, on_error=this._on_stream_error) :: let xtgt = this.xstream(id_target) xtgt.when_done = async_target(xtgt).catch(on_error) return xtgt async removeAfter(id_target, p_done) :: try :: try :: await p_done finally :: if id_target :: this.removeTarget(id_target) catch err :: if err instanceof Error :: this._on_error('after', err)