
- Complete NestJS TypeScript implementation with WebSocket support - Direct messaging (DM) and group chat functionality - End-to-end encryption with AES encryption and key pairs - Media file support (images, videos, audio, documents) up to 100MB - Push notifications with Firebase Cloud Messaging integration - Mention alerts and real-time typing indicators - User authentication with JWT and Passport - SQLite database with TypeORM entities and relationships - Comprehensive API documentation with Swagger/OpenAPI - File upload handling with secure access control - Online/offline status tracking and presence management - Message editing, deletion, and reply functionality - Notification management with automatic cleanup - Health check endpoint for monitoring - CORS configuration for cross-origin requests - Environment-based configuration management - Structured for Flutter SDK integration Features implemented: ✅ Real-time messaging with Socket.IO ✅ User registration and authentication ✅ Direct messages and group chats ✅ Media file uploads and management ✅ End-to-end encryption ✅ Push notifications ✅ Mention alerts ✅ Typing indicators ✅ Message read receipts ✅ Online status tracking ✅ File access control ✅ Comprehensive API documentation Ready for Flutter SDK development and production deployment.
282 lines
10 KiB
TypeScript
282 lines
10 KiB
TypeScript
/*
|
|
* Copyright 2022 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
import { CallCredentials } from "./call-credentials";
|
|
import { Call, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
|
|
import { SubchannelCall } from "./subchannel-call";
|
|
import { ConnectivityState } from "./connectivity-state";
|
|
import { LogVerbosity, Status } from "./constants";
|
|
import { Deadline, getDeadlineTimeoutString } from "./deadline";
|
|
import { InternalChannel } from "./internal-channel";
|
|
import { Metadata } from "./metadata";
|
|
import { PickResultType } from "./picker";
|
|
import { CallConfig } from "./resolver";
|
|
import { splitHostPort } from "./uri-parser";
|
|
import * as logging from './logging';
|
|
import { restrictControlPlaneStatusCode } from "./control-plane-status";
|
|
import * as http2 from 'http2';
|
|
|
|
const TRACER_NAME = 'load_balancing_call';
|
|
|
|
export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED';
|
|
|
|
export interface StatusObjectWithProgress extends StatusObject {
|
|
progress: RpcProgress;
|
|
}
|
|
|
|
export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
|
|
onReceiveStatus(status: StatusObjectWithProgress): void;
|
|
}
|
|
|
|
export class LoadBalancingCall implements Call {
|
|
private child: SubchannelCall | null = null;
|
|
private readPending = false;
|
|
private pendingMessage: {context: MessageContext, message: Buffer} | null = null;
|
|
private pendingHalfClose = false;
|
|
private ended = false;
|
|
private serviceUrl: string;
|
|
private metadata: Metadata | null = null;
|
|
private listener: InterceptingListener | null = null;
|
|
private onCallEnded: ((statusCode: Status) => void) | null = null;
|
|
constructor(
|
|
private readonly channel: InternalChannel,
|
|
private readonly callConfig: CallConfig,
|
|
private readonly methodName: string,
|
|
private readonly host : string,
|
|
private readonly credentials: CallCredentials,
|
|
private readonly deadline: Deadline,
|
|
private readonly callNumber: number
|
|
) {
|
|
const splitPath: string[] = this.methodName.split('/');
|
|
let serviceName = '';
|
|
/* The standard path format is "/{serviceName}/{methodName}", so if we split
|
|
* by '/', the first item should be empty and the second should be the
|
|
* service name */
|
|
if (splitPath.length >= 2) {
|
|
serviceName = splitPath[1];
|
|
}
|
|
const hostname = splitHostPort(this.host)?.host ?? 'localhost';
|
|
/* Currently, call credentials are only allowed on HTTPS connections, so we
|
|
* can assume that the scheme is "https" */
|
|
this.serviceUrl = `https://${hostname}/${serviceName}`;
|
|
}
|
|
|
|
private trace(text: string): void {
|
|
logging.trace(
|
|
LogVerbosity.DEBUG,
|
|
TRACER_NAME,
|
|
'[' + this.callNumber + '] ' + text
|
|
);
|
|
}
|
|
|
|
private outputStatus(status: StatusObject, progress: RpcProgress) {
|
|
if (!this.ended) {
|
|
this.ended = true;
|
|
this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"');
|
|
const finalStatus = {...status, progress};
|
|
this.listener?.onReceiveStatus(finalStatus);
|
|
this.onCallEnded?.(finalStatus.code);
|
|
}
|
|
}
|
|
|
|
doPick() {
|
|
if (this.ended) {
|
|
return;
|
|
}
|
|
if (!this.metadata) {
|
|
throw new Error('doPick called before start');
|
|
}
|
|
this.trace('Pick called')
|
|
const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
|
|
const subchannelString = pickResult.subchannel ?
|
|
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :
|
|
'' + pickResult.subchannel;
|
|
this.trace(
|
|
'Pick result: ' +
|
|
PickResultType[pickResult.pickResultType] +
|
|
' subchannel: ' +
|
|
subchannelString +
|
|
' status: ' +
|
|
pickResult.status?.code +
|
|
' ' +
|
|
pickResult.status?.details
|
|
);
|
|
switch (pickResult.pickResultType) {
|
|
case PickResultType.COMPLETE:
|
|
this.credentials.generateMetadata({service_url: this.serviceUrl}).then(
|
|
(credsMetadata) => {
|
|
const finalMetadata = this.metadata!.clone();
|
|
finalMetadata.merge(credsMetadata);
|
|
if (finalMetadata.get('authorization').length > 1) {
|
|
this.outputStatus(
|
|
{
|
|
code: Status.INTERNAL,
|
|
details: '"authorization" metadata cannot have multiple values',
|
|
metadata: new Metadata()
|
|
},
|
|
'PROCESSED'
|
|
);
|
|
}
|
|
if (pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY) {
|
|
this.trace(
|
|
'Picked subchannel ' +
|
|
subchannelString +
|
|
' has state ' +
|
|
ConnectivityState[pickResult.subchannel!.getConnectivityState()] +
|
|
' after getting credentials metadata. Retrying pick'
|
|
);
|
|
this.doPick();
|
|
return;
|
|
}
|
|
|
|
if (this.deadline !== Infinity) {
|
|
finalMetadata.set('grpc-timeout', getDeadlineTimeoutString(this.deadline));
|
|
}
|
|
try {
|
|
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
|
|
onReceiveMetadata: metadata => {
|
|
this.trace('Received metadata');
|
|
this.listener!.onReceiveMetadata(metadata);
|
|
},
|
|
onReceiveMessage: message => {
|
|
this.trace('Received message');
|
|
this.listener!.onReceiveMessage(message);
|
|
},
|
|
onReceiveStatus: status => {
|
|
this.trace('Received status');
|
|
if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
|
|
this.outputStatus(status, 'REFUSED');
|
|
} else {
|
|
this.outputStatus(status, 'PROCESSED');
|
|
}
|
|
}
|
|
});
|
|
} catch (error) {
|
|
this.trace(
|
|
'Failed to start call on picked subchannel ' +
|
|
subchannelString +
|
|
' with error ' +
|
|
(error as Error).message
|
|
);
|
|
this.outputStatus(
|
|
{
|
|
code: Status.INTERNAL,
|
|
details: 'Failed to start HTTP/2 stream with error ' + (error as Error).message,
|
|
metadata: new Metadata()
|
|
},
|
|
'NOT_STARTED'
|
|
);
|
|
return;
|
|
}
|
|
this.callConfig.onCommitted?.();
|
|
pickResult.onCallStarted?.();
|
|
this.onCallEnded = pickResult.onCallEnded;
|
|
this.trace('Created child call [' + this.child.getCallNumber() + ']');
|
|
if (this.readPending) {
|
|
this.child.startRead();
|
|
}
|
|
if (this.pendingMessage) {
|
|
this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
|
|
}
|
|
if (this.pendingHalfClose) {
|
|
this.child.halfClose();
|
|
}
|
|
}, (error: Error & { code: number }) => {
|
|
// We assume the error code isn't 0 (Status.OK)
|
|
const {code, details} = restrictControlPlaneStatusCode(
|
|
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
|
|
`Getting metadata from plugin failed with error: ${error.message}`
|
|
)
|
|
this.outputStatus(
|
|
{
|
|
code: code,
|
|
details: details,
|
|
metadata: new Metadata()
|
|
},
|
|
'PROCESSED'
|
|
);
|
|
}
|
|
);
|
|
break;
|
|
case PickResultType.DROP:
|
|
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
|
setImmediate(() => {
|
|
this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'DROP');
|
|
});
|
|
break;
|
|
case PickResultType.TRANSIENT_FAILURE:
|
|
if (this.metadata.getOptions().waitForReady) {
|
|
this.channel.queueCallForPick(this);
|
|
} else {
|
|
const {code, details} = restrictControlPlaneStatusCode(pickResult.status!.code, pickResult.status!.details);
|
|
setImmediate(() => {
|
|
this.outputStatus({code, details, metadata: pickResult.status!.metadata}, 'PROCESSED');
|
|
});
|
|
}
|
|
break;
|
|
case PickResultType.QUEUE:
|
|
this.channel.queueCallForPick(this);
|
|
}
|
|
}
|
|
|
|
cancelWithStatus(status: Status, details: string): void {
|
|
this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
|
|
this.child?.cancelWithStatus(status, details);
|
|
this.outputStatus({code: status, details: details, metadata: new Metadata()}, 'PROCESSED');
|
|
}
|
|
getPeer(): string {
|
|
return this.child?.getPeer() ?? this.channel.getTarget();
|
|
}
|
|
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
|
|
this.trace('start called');
|
|
this.listener = listener;
|
|
this.metadata = metadata;
|
|
this.doPick();
|
|
}
|
|
sendMessageWithContext(context: MessageContext, message: Buffer): void {
|
|
this.trace('write() called with message of length ' + message.length);
|
|
if (this.child) {
|
|
this.child.sendMessageWithContext(context, message);
|
|
} else {
|
|
this.pendingMessage = {context, message};
|
|
}
|
|
}
|
|
startRead(): void {
|
|
this.trace('startRead called');
|
|
if (this.child) {
|
|
this.child.startRead();
|
|
} else {
|
|
this.readPending = true;
|
|
}
|
|
}
|
|
halfClose(): void {
|
|
this.trace('halfClose called');
|
|
if (this.child) {
|
|
this.child.halfClose();
|
|
} else {
|
|
this.pendingHalfClose = true;
|
|
}
|
|
}
|
|
setCredentials(credentials: CallCredentials): void {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
|
|
getCallNumber(): number {
|
|
return this.callNumber;
|
|
}
|
|
}
|