import { o_assign, o_create, is_array, p_then, p_res, _ref, _unref } from './builtins.jsy' import { ao_defer_v } from './timeouts.jsy' import { as_send_pkt, pkt_clone } from './as_pkt.jsy' export const local_channel = 'function' === typeof structuredClone ? local_clone_channel : local_message_channel export const channel_kinds = @{} raw_channel, raw_local_channel, send_channel, codec_channel, local_channel export const base_channel = @{} channel_id: null toJSON() :: throw new Error @ 'Not serializable' to_cbor_encode() :: throw new Error @ 'Not serializable' warn_undeliverable(pkt, mode) :: console.warn @ '~~ undeliverable', @{} mode, id_route: pkt[0], id_target: pkt[1] undeliverable(pkt, mode) :: let dedup = this._undel_seen ??= new Set() let _id = pkt[0]+','+pkt[1] if ! dedup.has(_id) :: dedup.add(_id) this.warn_undeliverable(pkt, mode) // Trigger on_sent(false) p_then(pkt.on_sent, false) return false on_send_error(err) :: console.warn @ '~~ channel send error', err addRoute(id_route, opt) :: return this.router.addPeer @ id_route, this, opt get peerRoute() :: let {p2p} = this return p2p ? p2p.peerRoute.bind(p2p) : Boolean close() :: return false async init(channel_id) :: if undefined === this.peer_info :: this.peer_info = p_res(this.ready) .then @=> this.p2p ? this.p2p.hello() : null this.peer_info.catch(Boolean) // supress 'unhandled rejection' warnings if channel_id :: this.channel_id = channel_id return await this export function raw_channel(p2p, unpack, chan_ex) :: let channel = o_assign @ o_create(this._channel_), chan_ex if null != p2p :: if ! p2p.hello || ! p2p.peerRoute :: throw new TypeError channel.p2p = p2p.initForChannel(channel) let r_disp = this.dispatch let recv = unpack ? async pkt => r_disp @ await unpack(await pkt), channel : async pkt => r_disp @ await pkt, channel return @[] recv, channel const _as_dispatch_fn = chan_dispatch => @ async (pkt, _pkt_sent) => @ _pkt_sent = !! await chan_dispatch(pkt) p_then(pkt.on_sent, _pkt_sent) _pkt_sent export function send_channel(p2p, chan_dispatch) :: let dispatch = _as_dispatch_fn(chan_dispatch) return this.raw_channel @ p2p, null, @{} dispatch send: async (...args) => dispatch @ await as_send_pkt @ args send_pkt: async pkt => dispatch @ await pkt export function codec_channel(p2p, chan_dispatch, {encode_pkt, decode_pkt}) :: // see plugin/standard/json_codec.jsy for a codec implementation let dispatch = _as_dispatch_fn(chan_dispatch) return this.raw_channel @ p2p, decode_pkt, @{} dispatch: async pkt => dispatch @ await encode_pkt @ pkt send: async (...args) => dispatch @ await encode_pkt @ as_send_pkt @ args send_pkt: async pkt => dispatch @ await encode_pkt @ pkt export function raw_local_channel({send_pkt, ... chan_ex}) :: return this.raw_channel @ null, null, @{} is_local: true, addRoute: null dispatch: this.dispatch, send: (...args) => send_pkt @ as_send_pkt(args) send_pkt ... chan_ex export function local_clone_channel() :: // use structuredClone function directly (new in 2021) let tid, [recv, channel] = this.raw_local_channel @: ref() :: tid ? _ref(tid) : tid=setInterval(Boolean, 32767) unref() :: _unref(tid) send_pkt: async pkt => await recv @ pkt_clone(pkt) return channel export function local_message_channel() :: let pkt_id=1, map = new Map() // use MessageChannel for Structured Clone let { port1, port2 } = new MessageChannel() port2.onmessage = evt => :: let [id, pkt] = evt.data let [p, resolve, reject] = map.get(id) map.delete(id) try :: let res = recv(pkt) resolve(res) catch err :: channel.on_send_error(err) reject(err) let [recv, channel] = this.raw_local_channel @: ref() :: _ref(port2) unref() :: _unref(port2) send_pkt(pkt) :: let id=pkt_id++, dp=ao_defer_v() map.set(id, dp) port1.postMessage @# id, pkt return dp[0] _unref(port2) return channel