Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 200 additions & 57 deletions packages/api/src/EmbeddedChatApi.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Rocketchat } from "@rocket.chat/sdk";
import type { IMessage } from "@rocket.chat/sdk/interfaces";
import cloneArray from "./cloneArray";
import { ROCKETCHAT_APP_ID } from "./utils/constants";
import {
Expand All @@ -9,15 +10,132 @@ import {

// mutliple typing status can come at the same time they should be processed in order.
let typingHandlerLock = 0;

type DateValue = string | number | Date;
type DdpDate = { $date: DateValue };

type MessagePayload = Omit<IMessage, "ts"> & {
rid?: string;
ts?: DateValue | DdpDate;
renderType?: string;
[key: string]: unknown;
};

type UiInteractionPayload = {
[key: string]: unknown;
};

type ActionTriggeredPayload = {
[key: string]: unknown;
};

type StreamEventName = `${string}/${string}`;

type DdpStreamMessage = {
fields?: {
eventName?: StreamEventName;
args?: unknown[] | unknown;
};
[key: string]: unknown;
};

type PasswordLoginCredentials = {
user: string;
password: string;
code?: string;
};

type OAuthServiceTokenCredentials = {
service?: string;
access_token?: string;
serviceName?: string;
accessToken?: string;
expiresIn?: number;
[key: string]: string | number | boolean | undefined;
};

type ResumeTokenCredentials = {
resume: string;
[key: string]: unknown;
};

type TokenLoginCredentials = OAuthServiceTokenCredentials | ResumeTokenCredentials;

type AutoLoginInput = {
flow: "PASSWORD" | "OAUTH" | "TOKEN";
credentials?: TokenLoginCredentials;
};

const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null;

const normalizeStreamArgs = (args: unknown[] | unknown | undefined): unknown[] => {
if (Array.isArray(args)) {
return args;
}
return args !== undefined ? [args] : [];
};

const normalizeMessagePayload = (value: unknown): MessagePayload | null => {
if (!isRecord(value)) {
return null;
}
const message = JSON.parse(JSON.stringify(value)) as MessagePayload;
const tsValue = message.ts;
if (isRecord(tsValue) && "$date" in tsValue) {
const dateValue = tsValue.$date;
if (
typeof dateValue === "string" ||
typeof dateValue === "number" ||
dateValue instanceof Date
) {
message.ts = dateValue;
}
}
if (!message.ts) {
message.ts = new Date().toISOString();
}
return message;
};

const normalizeTokenCredentials = (
credentials: TokenLoginCredentials
): { [key: string]: string; service: string; access_token: string } | null => {
if ("resume" in credentials) {
return null;
}

const service = credentials.service ?? credentials.serviceName;
const accessToken = credentials.access_token ?? credentials.accessToken;

if (!service || !accessToken) {
return null;
}

return {
...Object.entries(credentials).reduce<{ [key: string]: string }>(
(acc, [key, value]) => {
if (typeof value === "string") {
acc[key] = value;
}
return acc;
},
{}
),
service,
access_token: accessToken,
};
};

export default class EmbeddedChatApi {
host: string;
rid: string;
rcClient: Rocketchat;
onMessageCallbacks: ((message: any) => void)[];
onMessageCallbacks: ((message: MessagePayload) => void)[];
onMessageDeleteCallbacks: ((messageId: string) => void)[];
onTypingStatusCallbacks: ((users: string[]) => void)[];
onActionTriggeredCallbacks: ((data: any) => void)[];
onUiInteractionCallbacks: ((data: any) => void)[];
onActionTriggeredCallbacks: ((data: ActionTriggeredPayload) => void)[];
onUiInteractionCallbacks: ((data: UiInteractionPayload) => void)[];
typingUsers: string[];
auth: RocketChatAuth;

Expand Down Expand Up @@ -146,10 +264,7 @@ export default class EmbeddedChatApi {
}
}

async autoLogin(auth: {
flow: "PASSWORD" | "OAUTH" | "TOKEN";
credentials: any;
}) {
async autoLogin(auth: AutoLoginInput) {
try {
if (!auth || !auth.flow) {
return;
Expand All @@ -163,7 +278,11 @@ export default class EmbeddedChatApi {
if (!auth.credentials) {
return;
}
await this.auth.loginWithOAuthServiceToken(auth.credentials);
const tokenCredentials = normalizeTokenCredentials(auth.credentials);
if (!tokenCredentials) {
return;
}
await this.auth.loginWithOAuthServiceToken(tokenCredentials);
break;
default:
break;
Expand Down Expand Up @@ -192,18 +311,11 @@ export default class EmbeddedChatApi {
const token = (await this.auth.getCurrentUser())?.authToken;
await this.rcClient.resume({ token });
await this.rcClient.subscribeRoom(this.rid);
await this.rcClient.onMessage((data: any) => {
if (!data) {
await this.rcClient.onMessage((data: unknown) => {
const message = normalizeMessagePayload(data);
if (!message) {
return;
}
const message = JSON.parse(JSON.stringify(data));
if (message.ts?.$date) {
console.log(message.ts?.$date);
message.ts = message.ts.$date;
}
if (!message.ts) {
message.ts = new Date().toISOString();
}
this.onMessageCallbacks.map((callback) => callback(message));
});
await this.rcClient.subscribe(
Expand All @@ -212,60 +324,76 @@ export default class EmbeddedChatApi {
);
await this.rcClient.onStreamData(
"stream-notify-room",
(ddpMessage: any) => {
const [roomId, event] = ddpMessage.fields.eventName.split("/");
(_error: unknown, ddpMessage: DdpStreamMessage) => {
const eventName = ddpMessage.fields?.eventName;
if (!eventName) {
return;
}
const [roomId, event] = eventName.split("/");
const args = normalizeStreamArgs(ddpMessage.fields?.args);

if (roomId !== this.rid) {
return;
}

if (event === "user-activity") {
const typingUser = ddpMessage.fields.args[0];
const isTyping = ddpMessage.fields.args[1]?.includes("user-typing");
this.handleTypingEvent({ typingUser, isTyping });
const typingUser = args[0];
const typingStates = args[1];
const isTyping =
Array.isArray(typingStates) &&
typingStates.some((state) => state === "user-typing");
if (typeof typingUser === "string") {
this.handleTypingEvent({ typingUser, isTyping });
}
}

if (event === "typing") {
const typingUser = ddpMessage.fields.args[0];
const isTyping = ddpMessage.fields.args[1];
this.handleTypingEvent({ typingUser, isTyping });
const typingUser = args[0];
const isTyping = args[1];
if (typeof typingUser === "string" && typeof isTyping === "boolean") {
this.handleTypingEvent({ typingUser, isTyping });
}
}
if (event === "deleteMessage") {
const messageId = ddpMessage.fields.args[0]?._id;
this.onMessageDeleteCallbacks.map((callback) =>
callback(messageId)
);
const firstArg = args[0];
const messageId =
isRecord(firstArg) && typeof firstArg._id === "string"
? firstArg._id
: null;
if (messageId) {
this.onMessageDeleteCallbacks.map((callback) => callback(messageId));
}
}
}
);
await this.rcClient.subscribeNotifyUser();
await this.rcClient.onStreamData(
"stream-notify-user",
(ddpMessage: any) => {
const [, event] = ddpMessage.fields.eventName.split("/");
const args: any[] = ddpMessage.fields.args
? Array.isArray(ddpMessage.fields.args)
? ddpMessage.fields.args
: [ddpMessage.fields.args]
: [];
(_error: unknown, ddpMessage: DdpStreamMessage) => {
const eventName = ddpMessage.fields?.eventName;
if (!eventName) {
return;
}
const [, event] = eventName.split("/");
const args = normalizeStreamArgs(ddpMessage.fields?.args);
if (event === "message") {
const data = args[0];
if (!data || data?.rid !== this.rid) {
if (!isRecord(data) || data.rid !== this.rid) {
return;
}
const message = JSON.parse(JSON.stringify(data));
if (message.ts?.$date) {
message.ts = message.ts.$date;
}
if (!message.ts) {
message.ts = new Date().toISOString();
const message = normalizeMessagePayload(data);
if (!message) {
return;
}
message.renderType = "blocks";
this.onMessageCallbacks.map((callback) => callback(message));
} else if (event === "uiInteraction") {
this.onUiInteractionCallbacks.forEach((callback) =>
callback(args[0])
);
const uiInteractionData = args[0];
if (isRecord(uiInteractionData)) {
this.onUiInteractionCallbacks.forEach((callback) =>
callback(uiInteractionData)
);
}
}
}
);
Expand All @@ -274,7 +402,7 @@ export default class EmbeddedChatApi {
}
}

async addMessageListener(callback: (message: any) => void) {
async addMessageListener(callback: (message: MessagePayload) => void) {
const idx = this.onMessageCallbacks.findIndex((c) => c === callback);
if (idx !== -1) {
this.onMessageCallbacks[idx] = callback;
Expand All @@ -283,7 +411,7 @@ export default class EmbeddedChatApi {
}
}

async removeMessageListener(callback: (message: any) => void) {
async removeMessageListener(callback: (message: MessagePayload) => void) {
this.onMessageCallbacks = this.onMessageCallbacks.filter(
(c) => c !== callback
);
Expand Down Expand Up @@ -319,7 +447,9 @@ export default class EmbeddedChatApi {
);
}

async addActionTriggeredListener(callback: (data: any) => void) {
async addActionTriggeredListener(
callback: (data: ActionTriggeredPayload) => void
) {
const idx = this.onActionTriggeredCallbacks.findIndex(
(c) => c === callback
);
Expand All @@ -330,13 +460,15 @@ export default class EmbeddedChatApi {
}
}

async removeActionTriggeredListener(callback: (data: any) => void) {
async removeActionTriggeredListener(
callback: (data: ActionTriggeredPayload) => void
) {
this.onActionTriggeredCallbacks = this.onActionTriggeredCallbacks.filter(
(c) => c !== callback
);
}

async addUiInteractionListener(callback: (data: any) => void) {
async addUiInteractionListener(callback: (data: UiInteractionPayload) => void) {
const idx = this.onUiInteractionCallbacks.findIndex((c) => c === callback);
if (idx !== -1) {
this.onUiInteractionCallbacks[idx] = callback;
Expand All @@ -345,7 +477,9 @@ export default class EmbeddedChatApi {
}
}

async removeUiInteractionListener(callback: (data: any) => void) {
async removeUiInteractionListener(
callback: (data: UiInteractionPayload) => void
) {
this.onUiInteractionCallbacks = this.onUiInteractionCallbacks.filter(
(c) => c !== callback
);
Expand Down Expand Up @@ -698,8 +832,14 @@ export default class EmbeddedChatApi {
* @param {*} message should be a string or an rc message object
* Refer https://developer.rocket.chat/reference/api/schema-definition/message#message-object
*/
async sendMessage(message: any, threadId: string) {
const messageObj =
async sendMessage(
message: string | (Record<string, unknown> & { msg?: string }),
threadId: string
) {
const messageObj: { rid: string; msg?: string; tmid?: string } & Record<
string,
unknown
> =
typeof message === "string"
? {
rid: this.rid,
Expand Down Expand Up @@ -1135,7 +1275,10 @@ export default class EmbeddedChatApi {
}
}

async handleUiKitInteraction(appId: string, userInteraction: any) {
async handleUiKitInteraction(
appId: string,
userInteraction: UiInteractionPayload
) {
try {
const { userId, authToken } = (await this.auth.getCurrentUser()) || {};

Expand All @@ -1157,7 +1300,7 @@ export default class EmbeddedChatApi {
}
);

const interaction = await response.json();
const interaction = (await response.json()) as ActionTriggeredPayload;
this.onActionTriggeredCallbacks.forEach((cb) => cb(interaction));
return interaction;
} catch (e) {
Expand Down