mirror of
https://github.com/advplyr/audiobookshelf.git
synced 2025-08-17 18:40:59 +02:00
Update:Refactor socket connection management into SocketAuthority
This commit is contained in:
196
server/Server.js
196
server/Server.js
@ -1,7 +1,6 @@
|
||||
const Path = require('path')
|
||||
const express = require('express')
|
||||
const http = require('http')
|
||||
const SocketIO = require('socket.io')
|
||||
const fs = require('./libs/fsExtra')
|
||||
const fileUpload = require('./libs/expressFileupload')
|
||||
const rateLimit = require('./libs/expressRateLimit')
|
||||
@ -13,11 +12,11 @@ const dbMigration = require('./utils/dbMigration')
|
||||
const filePerms = require('./utils/filePerms')
|
||||
const Logger = require('./Logger')
|
||||
|
||||
// Classes
|
||||
const Auth = require('./Auth')
|
||||
const Watcher = require('./Watcher')
|
||||
const Scanner = require('./scanner/Scanner')
|
||||
const Db = require('./Db')
|
||||
const SocketAuthority = require('./SocketAuthority')
|
||||
|
||||
const ApiRouter = require('./routers/ApiRouter')
|
||||
const HlsRouter = require('./routers/HlsRouter')
|
||||
@ -67,70 +66,30 @@ class Server {
|
||||
this.auth = new Auth(this.db)
|
||||
|
||||
// Managers
|
||||
this.taskManager = new TaskManager(this.emitter.bind(this))
|
||||
this.notificationManager = new NotificationManager(this.db, this.emitter.bind(this))
|
||||
this.backupManager = new BackupManager(this.db, this.emitter.bind(this))
|
||||
this.taskManager = new TaskManager()
|
||||
this.notificationManager = new NotificationManager(this.db)
|
||||
this.backupManager = new BackupManager(this.db)
|
||||
this.logManager = new LogManager(this.db)
|
||||
this.cacheManager = new CacheManager()
|
||||
this.abMergeManager = new AbMergeManager(this.db, this.taskManager, this.clientEmitter.bind(this))
|
||||
this.playbackSessionManager = new PlaybackSessionManager(this.db, this.emitter.bind(this), this.clientEmitter.bind(this))
|
||||
this.abMergeManager = new AbMergeManager(this.db, this.taskManager)
|
||||
this.playbackSessionManager = new PlaybackSessionManager(this.db)
|
||||
this.coverManager = new CoverManager(this.db, this.cacheManager)
|
||||
this.podcastManager = new PodcastManager(this.db, this.watcher, this.emitter.bind(this), this.notificationManager)
|
||||
this.audioMetadataManager = new AudioMetadataMangaer(this.db, this.taskManager, this.emitter.bind(this), this.clientEmitter.bind(this))
|
||||
this.rssFeedManager = new RssFeedManager(this.db, this.emitter.bind(this))
|
||||
this.podcastManager = new PodcastManager(this.db, this.watcher, this.notificationManager)
|
||||
this.audioMetadataManager = new AudioMetadataMangaer(this.db, this.taskManager)
|
||||
this.rssFeedManager = new RssFeedManager(this.db)
|
||||
|
||||
this.scanner = new Scanner(this.db, this.coverManager, this.emitter.bind(this))
|
||||
this.scanner = new Scanner(this.db, this.coverManager)
|
||||
this.cronManager = new CronManager(this.db, this.scanner, this.podcastManager)
|
||||
|
||||
// Routers
|
||||
this.apiRouter = new ApiRouter(this.db, this.auth, this.scanner, this.playbackSessionManager, this.abMergeManager, this.coverManager, this.backupManager, this.watcher, this.cacheManager, this.podcastManager, this.audioMetadataManager, this.rssFeedManager, this.cronManager, this.notificationManager, this.taskManager, this.getUsersOnline.bind(this), this.emitter.bind(this), this.clientEmitter.bind(this))
|
||||
this.hlsRouter = new HlsRouter(this.db, this.auth, this.playbackSessionManager, this.emitter.bind(this))
|
||||
this.apiRouter = new ApiRouter(this)
|
||||
this.hlsRouter = new HlsRouter(this.db, this.auth, this.playbackSessionManager)
|
||||
this.staticRouter = new StaticRouter(this.db)
|
||||
|
||||
Logger.logManager = this.logManager
|
||||
|
||||
this.server = null
|
||||
this.io = null
|
||||
|
||||
this.clients = {}
|
||||
}
|
||||
|
||||
// returns an array of User.toJSONForPublic with `connections` for the # of socket connections
|
||||
// a user can have many socket connections
|
||||
getUsersOnline() {
|
||||
const onlineUsersMap = {}
|
||||
Object.values(this.clients).filter(c => c.user).forEach(client => {
|
||||
if (onlineUsersMap[client.user.id]) {
|
||||
onlineUsersMap[client.user.id].connections++
|
||||
} else {
|
||||
onlineUsersMap[client.user.id] = {
|
||||
...client.user.toJSONForPublic(this.playbackSessionManager.sessions, this.db.libraryItems),
|
||||
connections: 1
|
||||
}
|
||||
}
|
||||
})
|
||||
return Object.values(onlineUsersMap)
|
||||
}
|
||||
|
||||
getClientsForUser(userId) {
|
||||
return Object.values(this.clients).filter(c => c.user && c.user.id === userId)
|
||||
}
|
||||
|
||||
emitter(ev, data) {
|
||||
// Logger.debug('EMITTER', ev)
|
||||
this.io.emit(ev, data)
|
||||
}
|
||||
|
||||
clientEmitter(userId, ev, data) {
|
||||
var clients = this.getClientsForUser(userId)
|
||||
if (!clients.length) {
|
||||
return Logger.debug(`[Server] clientEmitter - no clients found for user ${userId}`)
|
||||
}
|
||||
clients.forEach((client) => {
|
||||
if (client.socket) {
|
||||
client.socket.emit(ev, data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
authMiddleware(req, res, next) {
|
||||
@ -283,71 +242,8 @@ class Server {
|
||||
Logger.info(`Listening on http://${this.Host}:${this.Port}`)
|
||||
})
|
||||
|
||||
this.io = new SocketIO.Server(this.server, {
|
||||
cors: {
|
||||
origin: '*',
|
||||
methods: ["GET", "POST"]
|
||||
}
|
||||
})
|
||||
this.io.on('connection', (socket) => {
|
||||
this.clients[socket.id] = {
|
||||
id: socket.id,
|
||||
socket,
|
||||
connected_at: Date.now()
|
||||
}
|
||||
socket.sheepClient = this.clients[socket.id]
|
||||
|
||||
Logger.info('[Server] Socket Connected', socket.id)
|
||||
|
||||
// Required for associating a User with a socket
|
||||
socket.on('auth', (token) => this.authenticateSocket(socket, token))
|
||||
|
||||
// Scanning
|
||||
socket.on('cancel_scan', this.cancelScan.bind(this))
|
||||
|
||||
// Logs
|
||||
socket.on('set_log_listener', (level) => Logger.addSocketListener(socket, level))
|
||||
socket.on('remove_log_listener', () => Logger.removeSocketListener(socket.id))
|
||||
socket.on('fetch_daily_logs', () => this.logManager.socketRequestDailyLogs(socket))
|
||||
|
||||
// Events for testing
|
||||
socket.on('message_all_users', (payload) => {
|
||||
// admin user can send a message to all authenticated users
|
||||
// displays on the web app as a toast
|
||||
const client = this.clients[socket.id] || {}
|
||||
if (client.user && client.user.isAdminOrUp) {
|
||||
this.emitter('admin_message', payload.message || '')
|
||||
} else {
|
||||
Logger.error(`[Server] Non-admin user sent the message_all_users event`)
|
||||
}
|
||||
})
|
||||
socket.on('ping', () => {
|
||||
const client = this.clients[socket.id] || {}
|
||||
const user = client.user || {}
|
||||
Logger.debug(`[Server] Received ping from socket ${user.username || 'No User'}`)
|
||||
socket.emit('pong')
|
||||
})
|
||||
|
||||
// Sent automatically from socket.io clients
|
||||
socket.on('disconnect', (reason) => {
|
||||
Logger.removeSocketListener(socket.id)
|
||||
|
||||
const _client = this.clients[socket.id]
|
||||
if (!_client) {
|
||||
Logger.warn(`[Server] Socket ${socket.id} disconnect, no client (Reason: ${reason})`)
|
||||
} else if (!_client.user) {
|
||||
Logger.info(`[Server] Unauth socket ${socket.id} disconnected (Reason: ${reason})`)
|
||||
delete this.clients[socket.id]
|
||||
} else {
|
||||
Logger.debug('[Server] User Offline ' + _client.user.username)
|
||||
this.io.emit('user_offline', _client.user.toJSONForPublic(this.playbackSessionManager.sessions, this.db.libraryItems))
|
||||
|
||||
const disconnectTime = Date.now() - _client.connected_at
|
||||
Logger.info(`[Server] Socket ${socket.id} disconnected from client "${_client.user.username}" after ${disconnectTime}ms (Reason: ${reason})`)
|
||||
delete this.clients[socket.id]
|
||||
}
|
||||
})
|
||||
})
|
||||
// Start listening for socket connections
|
||||
SocketAuthority.initialize(this)
|
||||
}
|
||||
|
||||
async initializeServer(req, res) {
|
||||
@ -366,11 +262,6 @@ class Server {
|
||||
await this.scanner.scanFilesChanged(fileUpdates)
|
||||
}
|
||||
|
||||
cancelScan(id) {
|
||||
Logger.debug('[Server] Cancel scan', id)
|
||||
this.scanner.setCancelLibraryScan(id)
|
||||
}
|
||||
|
||||
// Remove unused /metadata/items/{id} folders
|
||||
async purgeMetadata() {
|
||||
var itemsMetadata = Path.join(global.MetadataPath, 'items')
|
||||
@ -449,68 +340,11 @@ class Server {
|
||||
logout(req, res) {
|
||||
var { socketId } = req.body
|
||||
Logger.info(`[Server] User ${req.user ? req.user.username : 'Unknown'} is logging out with socket ${socketId}`)
|
||||
|
||||
// Strip user and client from client and client socket
|
||||
if (socketId && this.clients[socketId]) {
|
||||
var client = this.clients[socketId]
|
||||
var clientSocket = client.socket
|
||||
Logger.debug(`[Server] Found user client ${clientSocket.id}, Has user: ${!!client.user}, Socket has client: ${!!clientSocket.sheepClient}`)
|
||||
|
||||
if (client.user) {
|
||||
Logger.debug('[Server] User Offline ' + client.user.username)
|
||||
this.io.emit('user_offline', client.user.toJSONForPublic(null, this.db.libraryItems))
|
||||
}
|
||||
|
||||
delete this.clients[socketId].user
|
||||
if (clientSocket && clientSocket.sheepClient) delete this.clients[socketId].socket.sheepClient
|
||||
} else if (socketId) {
|
||||
Logger.warn(`[Server] No client for socket ${socketId}`)
|
||||
}
|
||||
SocketAuthority.logout(socketId)
|
||||
|
||||
res.sendStatus(200)
|
||||
}
|
||||
|
||||
// When setting up a socket connection the user needs to be associated with a socket id
|
||||
// for this the client will send a 'auth' event that includes the users API token
|
||||
async authenticateSocket(socket, token) {
|
||||
const user = await this.auth.authenticateUser(token)
|
||||
if (!user) {
|
||||
Logger.error('Cannot validate socket - invalid token')
|
||||
return socket.emit('invalid_token')
|
||||
}
|
||||
const client = this.clients[socket.id]
|
||||
|
||||
if (client.user !== undefined) {
|
||||
Logger.debug(`[Server] Authenticating socket client already has user`, client.user.username)
|
||||
}
|
||||
|
||||
client.user = user
|
||||
|
||||
if (!client.user.toJSONForBrowser) {
|
||||
Logger.error('Invalid user...', client.user)
|
||||
return
|
||||
}
|
||||
|
||||
Logger.debug(`[Server] User Online ${client.user.username}`)
|
||||
|
||||
// TODO: Send to authenticated clients only
|
||||
this.io.emit('user_online', client.user.toJSONForPublic(this.playbackSessionManager.sessions, this.db.libraryItems))
|
||||
|
||||
user.lastSeen = Date.now()
|
||||
await this.db.updateEntity('user', user)
|
||||
|
||||
const initialPayload = {
|
||||
userId: client.user.id,
|
||||
username: client.user.username,
|
||||
librariesScanning: this.scanner.librariesScanning
|
||||
}
|
||||
if (user.isAdminOrUp) {
|
||||
initialPayload.usersOnline = this.getUsersOnline()
|
||||
}
|
||||
|
||||
client.socket.emit('init', initialPayload)
|
||||
}
|
||||
|
||||
async stop() {
|
||||
await this.watcher.close()
|
||||
Logger.info('Watcher Closed')
|
||||
|
Reference in New Issue
Block a user