shawon_majid

transport-v2.ts

Nov 12th, 2025
455
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
TypeScript 9.72 KB | Source Code | 0 0
  1. /* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
  2.  
  3. 'use client';
  4.  
  5. import type { Command, Config } from '@langchain/langgraph-sdk';
  6. import type {
  7.   BagTemplate,
  8.   GetConfigurableType,
  9.   GetUpdateType,
  10.   UseStreamTransport,
  11. } from './types';
  12.  
  13. export type IpcRendererLike = {
  14.   on: (
  15.     channel: string,
  16.     listener: (event: unknown, payload: unknown) => void,
  17.   ) => void;
  18.   off?: (
  19.     channel: string,
  20.     listener: (event: unknown, payload: unknown) => void,
  21.   ) => void;
  22.   send?: (channel: string, payload: unknown) => void;
  23.   invoke?: (channel: string, payload: unknown) => Promise<unknown>;
  24. };
  25.  
  26. export type IpcStreamTransportOptions = {
  27.   /**
  28.    * Channel to request starting a stream.
  29.    * The main process should start producing events on the provided event channel.
  30.    */
  31.   requestChannel: string;
  32.  
  33.   /**
  34.    * Base name for the dedicated per-run event channel.
  35.    * The renderer will listen on `${eventsBaseChannel}:${runId}`.
  36.    */
  37.   eventsBaseChannel: string;
  38.  
  39.   /**
  40.    * Optional channel to request cancellation of an in-flight stream.
  41.    * The renderer will send `{ runId }` when the AbortSignal triggers.
  42.    */
  43.   cancelChannel?: string;
  44.  
  45.   /**
  46.    * IPC surface exposed from preload.
  47.    */
  48.   ipc: IpcRendererLike;
  49.  
  50.   /**
  51.    * When using `send` (fire-and-forget), set a sentinel event name that indicates
  52.    * completion of the stream so the generator can finish and clean up.
  53.    * If `invoke` is available, this is not required (completion is inferred when
  54.    * the invoke handler resolves).
  55.    * Defaults to `"__complete__"`.
  56.    */
  57.   completeEventName?: string;
  58.  
  59.   /**
  60.    * Optional callback when thread metadata is received
  61.    */
  62.   onThreadId?: (threadId: string) => void;
  63. };
  64.  
  65. /**
  66.  * IPC transport for useStreamCustom that streams events over Electron IPC instead of HTTP/SSE.
  67.  *
  68.  * Expected main-process behavior:
  69.  * - Handle `requestChannel` with payload `{ runId, eventChannel, input, context, command, config }`
  70.  * - Emit stream events on `eventChannel` with shape `{ event, data, id? }`
  71.  * - Optionally handle `cancelChannel` with payload `{ runId }` to cancel
  72.  * - If using fire-and-forget `send`, emit a final sentinel event named `completeEventName`
  73.  *   (default `"__complete__"`) so the generator can complete.
  74.  *
  75.  * This class does not depend on Electron directly; pass a preload-exposed `ipc` adapter.
  76.  */
  77. export class IpcStreamTransport<
  78.   StateType extends Record<string, unknown> = Record<string, unknown>,
  79.   Bag extends BagTemplate = BagTemplate,
  80. > implements UseStreamTransport<StateType, Bag>
  81. {
  82.   // eslint-disable-next-line no-useless-constructor, no-empty-function
  83.   constructor(private readonly options: IpcStreamTransportOptions) {}
  84.  
  85.   async stream(payload: {
  86.     input: GetUpdateType<Bag, StateType> | null | undefined;
  87.     context: GetConfigurableType<Bag> | undefined;
  88.     command: Command | undefined;
  89.     config: (Config & { configurable?: GetConfigurableType<Bag> }) | undefined;
  90.     signal: AbortSignal;
  91.   }): Promise<AsyncGenerator<{ id?: string; event: string; data: unknown }>> {
  92.     const runId = crypto.randomUUID();
  93.     const eventChannel = `${this.options.eventsBaseChannel}:${runId}`;
  94.     const completeEventName = this.options.completeEventName ?? '__complete__';
  95.  
  96.     // Internal event queue for the async generator bridge
  97.     const queue: Array<{ id?: string; event: string; data: unknown }> = [];
  98.     let nextResolver:
  99.       | ((
  100.           value: IteratorResult<{ id?: string; event: string; data: unknown }>,
  101.         ) => void)
  102.       | null = null;
  103.     let done = false;
  104.  
  105.     const resolveNextIfWaiting = () => {
  106.       if (nextResolver && queue.length) {
  107.         const r = nextResolver;
  108.         nextResolver = null;
  109.         r({ value: queue.shift()!, done: false });
  110.       }
  111.     };
  112.  
  113.     const pushEvent = (evt: { id?: string; event: string; data: unknown }) => {
  114.       queue.push(evt);
  115.       resolveNextIfWaiting();
  116.     };
  117.  
  118.     let removeListener = () => {};
  119.  
  120.     const finish = () => {
  121.       if (done) return;
  122.       done = true;
  123.       try {
  124.         removeListener();
  125.       } catch {
  126.         // ignore
  127.       }
  128.       if (nextResolver) {
  129.         const r = nextResolver;
  130.         nextResolver = null;
  131.         r({ value: undefined as never, done: true });
  132.       }
  133.     };
  134.  
  135.     const listener = (_ipcEvent: unknown, raw: unknown) => {
  136.       // Expecting payload shape `{ event: string, data: unknown, id?: string }`
  137.       const msg = raw as { id?: string; event?: unknown; data?: unknown };
  138.       if (!msg || typeof msg.event !== 'string') return;
  139.  
  140.       // Intercept completion sentinel for fire-and-forget mode
  141.       if (msg.event === completeEventName) {
  142.         finish();
  143.         return;
  144.       }
  145.  
  146.       // Capture thread_id from thread or metadata events
  147.       if (this.options.onThreadId) {
  148.         if (msg.event === 'thread' && msg.data) {
  149.           const threadId = (msg.data as { thread_id?: string }).thread_id;
  150.           if (threadId) {
  151.             this.options.onThreadId(threadId);
  152.           }
  153.         } else if (msg.event === 'metadata' && msg.data) {
  154.           const threadId = (msg.data as { thread_id?: string }).thread_id;
  155.           if (threadId) {
  156.             this.options.onThreadId(threadId);
  157.           }
  158.         }
  159.       }
  160.  
  161.       pushEvent({ id: msg.id, event: msg.event, data: msg.data });
  162.     };
  163.  
  164.     const getErrorName = (e: unknown, fallback: string): string => {
  165.       if (e && typeof e === 'object' && 'name' in e) {
  166.         const n = (e as { name?: unknown }).name;
  167.         if (typeof n === 'string') return n;
  168.       }
  169.       return fallback;
  170.     };
  171.     const getErrorMessage = (e: unknown, fallback: string): string => {
  172.       if (e && typeof e === 'object' && 'message' in e) {
  173.         const m = (e as { message?: unknown }).message;
  174.         if (typeof m === 'string') return m;
  175.       }
  176.       return fallback;
  177.     };
  178.  
  179.     const abort = () => {
  180.       if (done) return;
  181.       // Request cancellation upstream first
  182.       if (this.options.cancelChannel) {
  183.         const cancelPayload = { runId };
  184.         if (this.options.ipc.invoke) {
  185.           // eslint-disable-next-line no-void
  186.           void this.options.ipc.invoke(
  187.             this.options.cancelChannel,
  188.             cancelPayload,
  189.           );
  190.         } else {
  191.           this.options.ipc.send?.(this.options.cancelChannel, cancelPayload);
  192.         }
  193.       }
  194.       finish();
  195.     };
  196.  
  197.     if (payload.signal.aborted) {
  198.       abort();
  199.     } else {
  200.       payload.signal.addEventListener('abort', abort, { once: true });
  201.     }
  202.  
  203.     // Subscribe to the dedicated event channel
  204.     this.options.ipc.on(eventChannel, listener);
  205.     removeListener = () => this.options.ipc.off?.(eventChannel, listener);
  206.  
  207.     // Start the stream in main: prefer invoke so we can infer completion when it resolves
  208.     const startPayload = {
  209.       runId,
  210.       eventChannel,
  211.       input: payload.input,
  212.       context: payload.context,
  213.       command: payload.command,
  214.       config: payload.config,
  215.     };
  216.  
  217.     try {
  218.       if (this.options.ipc.invoke) {
  219.         // eslint-disable-next-line no-void
  220.         void this.options.ipc
  221.           .invoke(this.options.requestChannel, startPayload)
  222.           .catch((error: unknown) => {
  223.             // Surface as stream error event to consumers
  224.             pushEvent({
  225.               event: 'error',
  226.               data: {
  227.                 error: getErrorName(error, 'InvokeError'),
  228.                 message: getErrorMessage(error, 'Stream invocation failed'),
  229.               },
  230.             });
  231.           })
  232.           .finally(() => {
  233.             // Completion of invoke means the stream is finished
  234.             finish();
  235.           });
  236.       } else {
  237.         // Fire-and-forget start; rely on completion sentinel event
  238.         this.options.ipc.send?.(this.options.requestChannel, startPayload);
  239.       }
  240.     } catch (error) {
  241.       pushEvent({
  242.         event: 'error',
  243.         data: {
  244.           error: getErrorName(error, 'StartError'),
  245.           message: getErrorMessage(error, 'Failed to start stream'),
  246.         },
  247.       });
  248.       // If starting failed synchronously, finish the generator
  249.       finish();
  250.     }
  251.  
  252.     const waitForNext = () =>
  253.       new Promise<
  254.         IteratorResult<{ id?: string; event: string; data: unknown }>
  255.       >((resolve) => {
  256.         nextResolver = resolve;
  257.       });
  258.  
  259.     async function* generator() {
  260.       try {
  261.         // Stream until completion
  262.         // eslint-disable-next-line no-constant-condition
  263.         while (true) {
  264.           if (done) return;
  265.           if (queue.length) {
  266.             yield queue.shift()!;
  267.             // eslint-disable-next-line no-continue
  268.             continue;
  269.           }
  270.           // eslint-disable-next-line no-await-in-loop
  271.           const next = await waitForNext();
  272.           if (next.done) return;
  273.           yield next.value!;
  274.         }
  275.       } finally {
  276.         // Ensure cleanup if consumer breaks early
  277.         finish();
  278.       }
  279.     }
  280.  
  281.     return generator();
  282.   }
  283. }
  284.  
  285. /**
  286.  * Minimal usage example (renderer):
  287.  *
  288.  * const [threadId, setThreadId] = useState<string | null>(null);
  289.  *
  290.  * const transport = new IpcStreamTransport({
  291.  *   ipc: window.electron.ipc, // preload-exposed
  292.  *   requestChannel: "lg/stream/start",
  293.  *   eventsBaseChannel: "lg/stream/events",
  294.  *   cancelChannel: "lg/stream/cancel",
  295.  *   onThreadId: (newThreadId) => setThreadId(newThreadId),
  296.  *   // completeEventName: "__complete__", // only for fire-and-forget send mode
  297.  * });
  298.  *
  299.  * const { submit, values, messages, isLoading, stop } = useStream({
  300.  *   transport,
  301.  *   messagesKey: "messages",
  302.  *   initialValues: { messages: [] },
  303.  * });
  304.  *
  305.  * // Use threadId in context when submitting messages
  306.  * await submit({ messages: [userMessage] }, { context: { threadId } });
  307.  */
  308.  
Advertisement
Add Comment
Please, Sign In to add comment