import { o_assign, o_create, o_freeze, as_func } from './builtins.jsy' import { _pi_apply, _pi_cmp } from './mfpi.jsy' import { bind_codec } from './codec.jsy' import { FabricRouter } from './router.jsy' import { TargetRouter } from './targets.jsy' export class FabricHub :: get is_fabric_hub() :: return true static create(...args) :: return new this(...args) constructor(... opt_args) :: let self = o_create(this) let options = this.options = this._initOptions({}, opt_args) _pi_apply @ ['pre'], this._plugins_, self :: let router = self._initHubRouter(options) let {loopback} = router o_assign @ this, @{} _root_: this router, loopback, send: loopback.send, _url_protos: {} this.local = self._initLocal(options) _pi_apply @ [null, 'post'], this._plugins_, self return self _initOptions(options, opt_args) :: for let opt of opt_args :: if 'string' === typeof opt :: options.id_route = opt else o_assign(options, opt) return options _initHubRouter(options) :: return new this.constructor.FabricRouter(this, options) _initLocal(options) :: return this.localRoute(options.id_route) localRoute(id_route, is_private) :: if null == id_route :: id_route = this.newRouteId() let local = new this.constructor.TargetRouter @ id_route, this.router, this if ! is_private :: this.router.publishRoute(local) return local get timeouts() :: return this.router.timeouts newRouteId(id_path, opt=this.options) :: return `${opt.id_prefix||''}${id_path||''}${this.randId(5)}${opt.id_suffix||''}` randId(n) :: return Math.random().toString(36).slice(2) async connectUpstream(upstream) :: upstream = await upstream if ! upstream.peer_info :: upstream = await this.connect(upstream) await upstream.peer_info let res = this.router.setUpstream(upstream) if ! res :: throw new TypeError() return upstream connect(conn_url) :: if ! conn_url.protocol :: conn_url = new URL @ conn_url.asURL ? conn_url.asURL() : ''+conn_url let connect = this._url_protos[conn_url.protocol] if ! connect :: throw new Error @ `Connection protocol "${conn_url.protocol}" not found` return connect(conn_url) registerProtocols(protocolList, cb_connect) :: as_func(cb_connect) for let protocol of protocolList :: this._url_protos[protocol] = cb_connect return this // --- stream codec support bind_codec(codec) :: return bind_codec(codec) as_stream_codec(codec) :: return this.stream_codec = this.bind_codec(codec) get codec() :: return this.stream_codec.codec // --- plugin support --- livePlugin(... _plugins_) :: _plugins_.sort(_pi_cmp) _pi_apply @ ['live', null, 'post'], _plugins_, this return this static plugin(... _plugins_) :: _plugins_ = o_freeze @ [... this._plugins_ || [], ... _plugins_, ].sort @ _pi_cmp let inst_plugins = _plugins_.slice() class FabricHub extends this :: FabricHub.prototype._plugins_ = inst_plugins o_assign @ FabricHub, @{} FabricRouter: (class extends (this.FabricRouter || FabricRouter) {}), TargetRouter: (class extends (this.TargetRouter || TargetRouter) {}), _plugins_ _pi_apply @ ['subclass'], _plugins_, FabricHub, inst_plugins inst_plugins.sort @ _pi_cmp // re-sort inst_plugins if extended during 'subclass' _pi_apply @ ['proto'], inst_plugins, FabricHub.prototype return FabricHub /* Plugin Provided (plugin/standard or similar): stream_codec: @{} encode_pkt(pkt) : pkt_data decode_pkt(pkt_data) : pkt p2p: @{} hello(ms_timeout=500) : Promise of peer_info peerRoute(pkt, pktctx) : awaitable */