Building Real-Time Trading Platforms with NestJS and WebSockets
by Hamzah Ejaz, Software Engineer
Building Henly, an agricultural trading platform, taught me critical lessons about real-time systems, WebSocket architecture, and managing complex business logic at scale.
System Requirements
The Challenge:
- Support 800+ concurrent users
- Real-time inventory updates
- Dynamic pricing engine
- Order matching system
- Payment processing
- Delivery tracking
Architecture Overview
NestJS Modular Design
// app.module.ts
@Module({
imports: [
ConfigModule.forRoot(),
MongooseModule.forRoot(process.env.MONGODB_URI),
AuthModule,
ProductsModule,
OrdersModule,
PaymentsModule,
WebSocketsModule,
],
})
export class AppModule {}
WebSocket Gateway
@WebSocketGateway({
cors: { origin: '*' },
namespace: 'trading',
})
export class TradingGateway implements OnGatewayConnection {
@WebSocketServer()
server: Server
constructor(
private readonly inventoryService: InventoryService,
private readonly orderService: OrderService,
) {}
async handleConnection(client: Socket) {
const userId = this.getUserFromToken(client.handshake.auth.token)
client.join(`user:${userId}`)
// Send current inventory
const inventory = await this.inventoryService.getAvailable()
client.emit('inventory:initial', inventory)
}
@SubscribeMessage('order:place')
async handleOrder(client: Socket, orderData: CreateOrderDto) {
const order = await this.orderService.create(orderData)
// Notify seller
this.server
.to(`user:${order.sellerId}`)
.emit('order:new', order)
// Update inventory for all users
this.server.emit('inventory:updated', {
productId: order.productId,
quantity: order.quantity,
})
return { success: true, orderId: order.id }
}
}
Real-time Inventory Management
Stock Synchronization
@Injectable()
export class InventoryService {
constructor(
@InjectModel('Product') private productModel: Model<Product>,
private readonly eventEmitter: EventEmitter2,
) {}
async updateStock(productId: string, delta: number) {
const product = await this.productModel.findByIdAndUpdate(
productId,
{ $inc: { stock: delta } },
{ new: true, session }
)
// Emit event for WebSocket broadcast
this.eventEmitter.emit('inventory.updated', {
productId,
newStock: product.stock,
})
// Check low stock threshold
if (product.stock < product.lowStockThreshold) {
this.eventEmitter.emit('inventory.lowStock', product)
}
return product
}
}
Optimistic Locking for Concurrency
const ProductSchema = new Schema({
name: String,
stock: Number,
version: { type: Number, default: 0 }, // For optimistic locking
})
async function updateStockSafely(productId, delta) {
let attempts = 0
const maxAttempts = 3
while (attempts < maxAttempts) {
const product = await Product.findById(productId)
const result = await Product.updateOne(
{
_id: productId,
version: product.version,
stock: { $gte: Math.abs(delta) } // Prevent negative stock
},
{
$inc: { stock: delta, version: 1 }
}
)
if (result.modifiedCount > 0) {
return true // Success
}
attempts++
await sleep(100 * attempts) // Exponential backoff
}
throw new Error('Failed to update stock after retries')
}
Dynamic Pricing Engine
@Injectable()
export class PricingService {
calculatePrice(product: Product, quantity: number): number {
let basePrice = product.basePrice
// Bulk discount
if (quantity > 100) {
basePrice *= 0.9 // 10% discount
} else if (quantity > 50) {
basePrice *= 0.95 // 5% discount
}
// Market demand adjustment
const demandMultiplier = this.getDemandMultiplier(product.id)
basePrice *= demandMultiplier
// Location-based pricing
const locationAdjustment = this.getLocationMultiplier(product.location)
basePrice *= locationAdjustment
return Math.round(basePrice * 100) / 100
}
private getDemandMultiplier(productId: string): number {
// Calculate based on recent orders
const recentOrders = this.getRecentOrders(productId, 24) // Last 24h
if (recentOrders > 100) return 1.1 // High demand
if (recentOrders < 10) return 0.9 // Low demand
return 1.0 // Normal
}
}
Order Processing Pipeline
@Injectable()
export class OrderService {
async processOrder(createOrderDto: CreateOrderDto) {
const session = await this.connection.startSession()
session.startTransaction()
try {
// 1. Reserve inventory
await this.inventoryService.updateStock(
createOrderDto.productId,
-createOrderDto.quantity,
session
)
// 2. Calculate pricing
const price = await this.pricingService.calculatePrice(
createOrderDto.productId,
createOrderDto.quantity
)
// 3. Create order
const order = await this.orderModel.create([{
...createOrderDto,
price,
status: 'pending',
}], { session })
// 4. Process payment
const payment = await this.paymentService.process({
orderId: order[0].id,
amount: price,
}, session)
// 5. Update order status
order[0].status = 'confirmed'
order[0].paymentId = payment.id
await order[0].save({ session })
await session.commitTransaction()
// 6. Emit events
this.eventEmitter.emit('order.created', order[0])
return order[0]
} catch (error) {
await session.abortTransaction()
throw error
} finally {
session.endSession()
}
}
}
Scalability Patterns
Redis Caching Layer
@Injectable()
export class CacheService {
constructor(@InjectRedis() private readonly redis: Redis) {}
async cacheInventory(products: Product[]) {
const pipeline = this.redis.pipeline()
products.forEach(product => {
pipeline.setex(
`product:${product.id}`,
300, // 5 min TTL
JSON.stringify(product)
)
})
await pipeline.exec()
}
async getProduct(id: string): Promise<Product | null> {
const cached = await this.redis.get(`product:${id}`)
return cached ? JSON.parse(cached) : null
}
}
Rate Limiting
@Injectable()
export class RateLimitGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest()
const userId = request.user.id
const key = `rate-limit:${userId}`
const current = await this.redis.incr(key)
if (current === 1) {
await this.redis.expire(key, 60) // 1 minute window
}
if (current > 100) { // 100 requests per minute
throw new HttpException('Too many requests', 429)
}
return true
}
}
Monitoring & Observability
@Injectable()
export class MetricsService {
private readonly metrics = {
ordersProcessed: new Counter({
name: 'orders_processed_total',
help: 'Total orders processed',
}),
orderProcessingTime: new Histogram({
name: 'order_processing_duration_seconds',
help: 'Order processing duration',
}),
activeWebSockets: new Gauge({
name: 'active_websocket_connections',
help: 'Active WebSocket connections',
}),
}
recordOrder() {
this.metrics.ordersProcessed.inc()
}
recordProcessingTime(duration: number) {
this.metrics.orderProcessingTime.observe(duration)
}
}
Results
- 500+ orders/month processed reliably
- 800+ active users with real-time updates
- < 200ms API response time
- 99.5% uptime SLA
- 95% order completion rate
Key Learnings
- Transaction Management: Use MongoDB transactions for data integrity
- Optimistic Locking: Prevent race conditions in inventory
- Event-Driven Design: Decouple services with EventEmitter
- Caching Strategy: Redis for frequently accessed data
- Monitoring: Prometheus metrics from day one
Real-time trading platforms are complex, but NestJS provides the structure needed for maintainable, scalable systems.