2021-08-18 00:01:11 +02:00
const Ffmpeg = require ( 'fluent-ffmpeg' )
const EventEmitter = require ( 'events' )
const Path = require ( 'path' )
const fs = require ( 'fs-extra' )
2021-09-04 21:17:26 +02:00
const Logger = require ( '../Logger' )
2021-11-25 03:15:50 +01:00
const { getId , secondsToTimestamp } = require ( '../utils/index' )
2021-09-04 21:17:26 +02:00
const { writeConcatFile } = require ( '../utils/ffmpegHelpers' )
const hlsPlaylistGenerator = require ( '../utils/hlsPlaylistGenerator' )
2021-08-18 00:01:11 +02:00
2021-11-13 02:43:16 +01:00
const UserListeningSession = require ( './UserListeningSession' )
2021-11-06 02:24:02 +01:00
2021-08-18 00:01:11 +02:00
class Stream extends EventEmitter {
2021-11-13 22:24:56 +01:00
constructor ( streamPath , client , audiobook , transcodeOptions = { } ) {
2021-08-18 00:01:11 +02:00
super ( )
2021-11-16 03:09:42 +01:00
this . id = getId ( 'str' )
2021-08-18 00:01:11 +02:00
this . client = client
this . audiobook = audiobook
2021-11-13 22:24:56 +01:00
this . transcodeOptions = transcodeOptions
2021-08-18 00:01:11 +02:00
this . segmentLength = 6
2021-10-26 03:38:09 +02:00
this . maxSeekBackTime = 30
2021-08-18 00:01:11 +02:00
this . streamPath = Path . join ( streamPath , this . id )
this . concatFilesPath = Path . join ( this . streamPath , 'files.txt' )
this . playlistPath = Path . join ( this . streamPath , 'output.m3u8' )
2021-09-04 21:17:26 +02:00
this . finalPlaylistPath = Path . join ( this . streamPath , 'final-output.m3u8' )
2021-08-18 00:01:11 +02:00
this . startTime = 0
this . ffmpeg = null
this . loop = null
this . isResetting = false
this . isClientInitialized = false
this . isTranscodeComplete = false
this . segmentsCreated = new Set ( )
this . furthestSegmentCreated = 0
this . clientCurrentTime = 0
2021-11-13 02:43:16 +01:00
this . listeningSession = new UserListeningSession ( )
this . listeningSession . setData ( audiobook , client . user )
2021-11-06 02:24:02 +01:00
2021-08-18 00:01:11 +02:00
this . init ( )
}
get socket ( ) {
2021-11-11 15:39:21 +01:00
return this . client ? this . client . socket || null : null
2021-08-18 00:01:11 +02:00
}
get audiobookId ( ) {
return this . audiobook . id
}
2021-09-12 23:10:12 +02:00
get audiobookTitle ( ) {
return this . audiobook ? this . audiobook . title : null
}
2021-08-18 00:01:11 +02:00
get totalDuration ( ) {
return this . audiobook . totalDuration
}
2021-10-02 20:50:39 +02:00
get tracksAudioFileType ( ) {
if ( ! this . tracks . length ) return null
return this . tracks [ 0 ] . ext . toLowerCase ( ) . slice ( 1 )
}
2021-10-01 01:52:32 +02:00
get hlsSegmentType ( ) {
var hasFlac = this . tracks . find ( t => t . ext . toLowerCase ( ) === '.flac' )
return hasFlac ? 'fmp4' : 'mpegts'
}
get segmentBasename ( ) {
if ( this . hlsSegmentType === 'fmp4' ) return 'output-%d.m4s'
return 'output-%d.ts'
}
2021-08-18 00:01:11 +02:00
get segmentStartNumber ( ) {
if ( ! this . startTime ) return 0
2021-10-26 23:52:45 +02:00
return Math . floor ( Math . max ( this . startTime - this . maxSeekBackTime , 0 ) / this . segmentLength )
2021-08-18 00:01:11 +02:00
}
get numSegments ( ) {
var numSegs = Math . floor ( this . totalDuration / this . segmentLength )
if ( this . totalDuration - ( numSegs * this . segmentLength ) > 0 ) {
numSegs ++
}
return numSegs
}
get tracks ( ) {
return this . audiobook . tracks
}
2021-10-26 23:52:45 +02:00
get clientUser ( ) {
2021-11-11 15:39:21 +01:00
return this . client ? this . client . user || { } : null
2021-10-26 23:52:45 +02:00
}
2021-11-17 00:37:49 +01:00
get userToken ( ) {
return this . clientUser ? this . clientUser . token : null
}
2021-10-26 23:52:45 +02:00
get clientUserAudiobooks ( ) {
2021-11-11 15:39:21 +01:00
return this . client ? this . clientUser . audiobooks || { } : null
2021-10-26 23:52:45 +02:00
}
get clientUserAudiobookData ( ) {
2021-11-11 15:39:21 +01:00
return this . client ? this . clientUserAudiobooks [ this . audiobookId ] : null
2021-10-26 23:52:45 +02:00
}
2021-08-18 00:01:11 +02:00
get clientPlaylistUri ( ) {
return ` /hls/ ${ this . id } /output.m3u8 `
}
get clientProgress ( ) {
if ( ! this . clientCurrentTime ) return 0
2021-10-24 22:53:51 +02:00
var prog = Math . min ( 1 , this . clientCurrentTime / this . totalDuration )
2021-10-24 21:02:49 +02:00
return Number ( prog . toFixed ( 3 ) )
2021-08-18 00:01:11 +02:00
}
2021-11-13 22:24:56 +01:00
get isAACEncodable ( ) {
return [ 'mp4' , 'm4a' , 'm4b' ] . includes ( this . tracksAudioFileType )
}
get transcodeForceAAC ( ) {
return ! ! this . transcodeOptions . forceAAC
}
2021-08-18 00:01:11 +02:00
toJSON ( ) {
return {
id : this . id ,
clientId : this . client . id ,
userId : this . client . user . id ,
audiobook : this . audiobook . toJSONMinified ( ) ,
segmentLength : this . segmentLength ,
playlistPath : this . playlistPath ,
clientPlaylistUri : this . clientPlaylistUri ,
clientCurrentTime : this . clientCurrentTime ,
startTime : this . startTime ,
2021-08-19 01:31:19 +02:00
segmentStartNumber : this . segmentStartNumber ,
2021-10-26 03:38:09 +02:00
isTranscodeComplete : this . isTranscodeComplete ,
2021-10-26 23:52:45 +02:00
lastUpdate : this . clientUserAudiobookData ? this . clientUserAudiobookData . lastUpdate : 0
2021-08-18 00:01:11 +02:00
}
}
init ( ) {
2021-10-26 23:52:45 +02:00
if ( this . clientUserAudiobookData ) {
var timeRemaining = this . totalDuration - this . clientUserAudiobookData . currentTime
Logger . info ( '[STREAM] User has progress for audiobook' , this . clientUserAudiobookData . progress , ` Time Remaining: ${ timeRemaining } s ` )
2021-08-18 00:01:11 +02:00
if ( timeRemaining > 15 ) {
2021-10-26 23:52:45 +02:00
this . startTime = this . clientUserAudiobookData . currentTime
2021-08-18 00:01:11 +02:00
this . clientCurrentTime = this . startTime
}
}
}
async checkSegmentNumberRequest ( segNum ) {
var segStartTime = segNum * this . segmentLength
if ( this . startTime > segStartTime ) {
Logger . warn ( ` [STREAM] Segment # ${ segNum } Request @ ${ secondsToTimestamp ( segStartTime ) } is before start time ( ${ secondsToTimestamp ( this . startTime ) } ) - Reset Transcode ` )
await this . reset ( segStartTime - ( this . segmentLength * 2 ) )
return segStartTime
} else if ( this . isTranscodeComplete ) {
return false
}
var distanceFromFurthestSegment = segNum - this . furthestSegmentCreated
if ( distanceFromFurthestSegment > 10 ) {
Logger . info ( ` Segment # ${ segNum } requested is ${ distanceFromFurthestSegment } segments from latest ( ${ secondsToTimestamp ( segStartTime ) } ) - Reset Transcode ` )
await this . reset ( segStartTime - ( this . segmentLength * 2 ) )
return segStartTime
}
return false
}
updateClientCurrentTime ( currentTime ) {
Logger . debug ( '[Stream] Updated client current time' , secondsToTimestamp ( currentTime ) )
this . clientCurrentTime = currentTime
}
2021-11-13 02:43:16 +01:00
syncStream ( { timeListened , currentTime } ) {
var syncLog = ''
if ( currentTime !== null && ! isNaN ( currentTime ) ) {
syncLog = ` Update client current time ${ secondsToTimestamp ( currentTime ) } `
this . clientCurrentTime = currentTime
}
var saveListeningSession = false
if ( timeListened && ! isNaN ( timeListened ) ) {
// Check if listening session should roll to next day
if ( this . listeningSession . checkDateRollover ( ) ) {
if ( ! this . clientUser ) {
Logger . error ( ` [Stream] Sync stream invalid client user ` )
return null
}
this . listeningSession = new UserListeningSession ( )
this . listeningSession . setData ( this . audiobook , this . clientUser )
Logger . debug ( ` [Stream] Listening session rolled to next day ` )
}
this . listeningSession . addListeningTime ( timeListened )
if ( syncLog ) syncLog += ' | '
syncLog += ` Add listening time ${ timeListened } s, Total time listened ${ this . listeningSession . timeListening } s `
saveListeningSession = true
}
Logger . debug ( '[Stream]' , syncLog )
return saveListeningSession ? this . listeningSession : null
}
2021-08-18 00:01:11 +02:00
async generatePlaylist ( ) {
fs . ensureDirSync ( this . streamPath )
2021-11-17 00:37:49 +01:00
await hlsPlaylistGenerator ( this . playlistPath , 'output' , this . totalDuration , this . segmentLength , this . hlsSegmentType , this . userToken )
2021-08-18 00:01:11 +02:00
return this . clientPlaylistUri
}
async checkFiles ( ) {
try {
var files = await fs . readdir ( this . streamPath )
files . forEach ( ( file ) => {
var extname = Path . extname ( file )
2021-10-01 01:52:32 +02:00
if ( extname === '.ts' || extname === '.m4s' ) {
2021-08-18 00:01:11 +02:00
var basename = Path . basename ( file , extname )
var num _part = basename . split ( '-' ) [ 1 ]
var part _num = Number ( num _part )
this . segmentsCreated . add ( part _num )
}
} )
if ( ! this . segmentsCreated . size ) {
Logger . warn ( 'No Segments' )
return
}
if ( this . segmentsCreated . size > 6 && ! this . isClientInitialized ) {
this . isClientInitialized = true
2021-11-11 15:39:21 +01:00
if ( this . socket ) {
Logger . info ( ` [STREAM] ${ this . id } notifying client that stream is ready ` )
this . socket . emit ( 'stream_open' , this . toJSON ( ) )
}
2021-08-18 00:01:11 +02:00
}
var chunks = [ ]
var current _chunk = [ ]
var last _seg _in _chunk = - 1
var segments = Array . from ( this . segmentsCreated ) . sort ( ( a , b ) => a - b ) ;
var lastSegment = segments [ segments . length - 1 ]
if ( lastSegment > this . furthestSegmentCreated ) {
this . furthestSegmentCreated = lastSegment
}
segments . forEach ( ( seg ) => {
if ( ! current _chunk . length || last _seg _in _chunk + 1 === seg ) {
last _seg _in _chunk = seg
current _chunk . push ( seg )
} else {
if ( current _chunk . length === 1 ) chunks . push ( current _chunk [ 0 ] )
else chunks . push ( ` ${ current _chunk [ 0 ] } - ${ current _chunk [ current _chunk . length - 1 ] } ` )
last _seg _in _chunk = seg
current _chunk = [ seg ]
}
} )
if ( current _chunk . length ) {
if ( current _chunk . length === 1 ) chunks . push ( current _chunk [ 0 ] )
else chunks . push ( ` ${ current _chunk [ 0 ] } - ${ current _chunk [ current _chunk . length - 1 ] } ` )
}
var perc = ( this . segmentsCreated . size * 100 / this . numSegments ) . toFixed ( 2 ) + '%'
Logger . info ( '[STREAM-CHECK] Check Files' , this . segmentsCreated . size , 'of' , this . numSegments , perc , ` Furthest Segment: ${ this . furthestSegmentCreated } ` )
2021-11-11 15:39:21 +01:00
// Logger.debug('[STREAM-CHECK] Chunks', chunks.join(', '))
if ( this . socket ) {
this . socket . emit ( 'stream_progress' , {
stream : this . id ,
percent : perc ,
chunks ,
numSegments : this . numSegments
} )
}
2021-08-18 00:01:11 +02:00
} catch ( error ) {
2021-09-22 03:57:33 +02:00
Logger . error ( 'Failed checking files' , error )
2021-08-18 00:01:11 +02:00
}
}
startLoop ( ) {
2021-09-13 01:22:52 +02:00
// Logger.info(`[Stream] ${this.audiobookTitle} (${this.id}) Start Loop`)
2021-11-11 15:39:21 +01:00
if ( this . socket ) {
this . socket . emit ( 'stream_progress' , { stream : this . id , chunks : [ ] , numSegments : 0 , percent : '0%' } )
}
2021-09-13 01:22:52 +02:00
clearInterval ( this . loop )
var intervalId = setInterval ( ( ) => {
2021-08-18 00:01:11 +02:00
if ( ! this . isTranscodeComplete ) {
this . checkFiles ( )
} else {
2021-11-11 15:39:21 +01:00
if ( this . socket ) {
Logger . info ( ` [Stream] ${ this . audiobookTitle } sending stream_ready ` )
this . socket . emit ( 'stream_ready' )
}
2021-09-13 01:22:52 +02:00
clearInterval ( intervalId )
2021-08-18 00:01:11 +02:00
}
} , 2000 )
2021-09-13 01:22:52 +02:00
this . loop = intervalId
2021-08-18 00:01:11 +02:00
}
async start ( ) {
Logger . info ( ` [STREAM] START STREAM - Num Segments: ${ this . numSegments } ` )
this . ffmpeg = Ffmpeg ( )
2021-10-26 23:52:45 +02:00
var adjustedStartTime = Math . max ( this . startTime - this . maxSeekBackTime , 0 )
var trackStartTime = await writeConcatFile ( this . tracks , this . concatFilesPath , adjustedStartTime )
2021-08-18 00:01:11 +02:00
this . ffmpeg . addInput ( this . concatFilesPath )
2021-10-24 18:32:52 +02:00
// seek_timestamp : https://ffmpeg.org/ffmpeg.html
// the argument to the -ss option is considered an actual timestamp, and is not offset by the start time of the file
2021-10-26 23:52:45 +02:00
// fixes https://github.com/advplyr/audiobookshelf/issues/116
2021-10-24 18:32:52 +02:00
this . ffmpeg . inputOption ( '-seek_timestamp 1' )
2021-08-18 00:01:11 +02:00
this . ffmpeg . inputFormat ( 'concat' )
this . ffmpeg . inputOption ( '-safe 0' )
2021-10-26 23:52:45 +02:00
if ( adjustedStartTime > 0 ) {
const shiftedStartTime = adjustedStartTime - trackStartTime
2021-10-24 18:32:52 +02:00
// Issues using exact fractional seconds i.e. 29.49814 - changing to 29.5s
var startTimeS = Math . round ( shiftedStartTime * 10 ) / 10 + 's'
2021-10-26 23:52:45 +02:00
Logger . info ( ` [STREAM] Starting Stream at startTime ${ secondsToTimestamp ( adjustedStartTime ) } (User startTime ${ secondsToTimestamp ( this . startTime ) } ) and Segment # ${ this . segmentStartNumber } ` )
2021-10-24 18:32:52 +02:00
this . ffmpeg . inputOption ( ` -ss ${ startTimeS } ` )
2021-08-18 00:01:11 +02:00
this . ffmpeg . inputOption ( '-noaccurate_seek' )
}
2021-11-13 22:24:56 +01:00
const logLevel = process . env . NODE _ENV === 'production' ? 'error' : 'warning'
const audioCodec = ( this . hlsSegmentType === 'fmp4' || this . tracksAudioFileType === 'opus' || this . transcodeForceAAC ) ? 'aac' : 'copy'
2021-08-18 00:01:11 +02:00
this . ffmpeg . addOption ( [
2021-09-08 16:15:54 +02:00
` -loglevel ${ logLevel } ` ,
2021-08-18 00:01:11 +02:00
'-map 0:a' ,
2021-10-01 01:52:32 +02:00
` -c:a ${ audioCodec } `
2021-08-18 00:01:11 +02:00
] )
2021-10-01 01:52:32 +02:00
const hlsOptions = [
2021-08-18 00:01:11 +02:00
'-f hls' ,
"-copyts" ,
2021-10-23 20:42:07 +02:00
"-avoid_negative_ts make_non_negative" ,
2021-08-18 00:01:11 +02:00
"-max_delay 5000000" ,
"-max_muxing_queue_size 2048" ,
` -hls_time 6 ` ,
2021-10-01 01:52:32 +02:00
` -hls_segment_type ${ this . hlsSegmentType } ` ,
2021-08-18 00:01:11 +02:00
` -start_number ${ this . segmentStartNumber } ` ,
"-hls_playlist_type vod" ,
"-hls_list_size 0" ,
"-hls_allow_cache 0"
2021-10-01 01:52:32 +02:00
]
if ( this . hlsSegmentType === 'fmp4' ) {
hlsOptions . push ( '-strict -2' )
2021-10-23 17:57:01 +02:00
// var fmp4InitFilename = Path.join(this.streamPath, 'init.mp4')
var fmp4InitFilename = 'init.mp4'
2021-10-01 01:52:32 +02:00
hlsOptions . push ( ` -hls_fmp4_init_filename ${ fmp4InitFilename } ` )
}
this . ffmpeg . addOption ( hlsOptions )
2021-08-18 00:01:11 +02:00
var segmentFilename = Path . join ( this . streamPath , this . segmentBasename )
this . ffmpeg . addOption ( ` -hls_segment_filename ${ segmentFilename } ` )
2021-09-04 21:17:26 +02:00
this . ffmpeg . output ( this . finalPlaylistPath )
2021-08-18 00:01:11 +02:00
this . ffmpeg . on ( 'start' , ( command ) => {
Logger . info ( '[INFO] FFMPEG transcoding started with command: ' + command )
2021-09-13 01:22:52 +02:00
Logger . info ( '' )
2021-08-18 00:01:11 +02:00
if ( this . isResetting ) {
2021-11-13 22:24:56 +01:00
// AAC encode is much slower
const clearIsResettingTime = this . transcodeForceAAC ? 3000 : 500
2021-08-18 00:01:11 +02:00
setTimeout ( ( ) => {
Logger . info ( '[STREAM] Clearing isResetting' )
this . isResetting = false
2021-09-13 01:22:52 +02:00
this . startLoop ( )
2021-11-13 22:24:56 +01:00
} , clearIsResettingTime )
2021-09-13 01:22:52 +02:00
} else {
this . startLoop ( )
2021-08-18 00:01:11 +02:00
}
} )
this . ffmpeg . on ( 'stderr' , ( stdErrline ) => {
Logger . info ( stdErrline )
} )
this . ffmpeg . on ( 'error' , ( err , stdout , stderr ) => {
if ( err . message && err . message . includes ( 'SIGKILL' ) ) {
// This is an intentional SIGKILL
Logger . info ( '[FFMPEG] Transcode Killed' )
this . ffmpeg = null
2021-11-13 22:24:56 +01:00
clearInterval ( this . loop )
2021-08-18 00:01:11 +02:00
} else {
2021-11-13 22:24:56 +01:00
Logger . error ( 'Ffmpeg Err' , '"' + err . message + '"' )
// Temporary workaround for https://github.com/advplyr/audiobookshelf/issues/172
const aacErrorMsg = 'ffmpeg exited with code 1: Could not write header for output file #0 (incorrect codec parameters ?)'
if ( audioCodec === 'copy' && this . isAACEncodable && err . message && err . message . startsWith ( aacErrorMsg ) ) {
Logger . info ( ` [Stream] Re-attempting stream with AAC encode ` )
this . transcodeOptions . forceAAC = true
this . reset ( this . startTime )
} else {
// Close stream show error
this . close ( err . message )
}
2021-08-18 00:01:11 +02:00
}
} )
this . ffmpeg . on ( 'end' , ( stdout , stderr ) => {
Logger . info ( '[FFMPEG] Transcoding ended' )
2021-08-21 01:29:10 +02:00
// For very small fast load
if ( ! this . isClientInitialized ) {
this . isClientInitialized = true
2021-11-11 15:39:21 +01:00
if ( this . socket ) {
Logger . info ( ` [STREAM] ${ this . id } notifying client that stream is ready ` )
this . socket . emit ( 'stream_open' , this . toJSON ( ) )
}
2021-08-18 00:01:11 +02:00
}
this . isTranscodeComplete = true
this . ffmpeg = null
2021-09-18 01:40:30 +02:00
clearInterval ( this . loop )
2021-08-18 00:01:11 +02:00
} )
this . ffmpeg . run ( )
}
2021-11-13 22:24:56 +01:00
async close ( errorMessage = null ) {
2021-08-18 00:01:11 +02:00
clearInterval ( this . loop )
Logger . info ( 'Closing Stream' , this . id )
if ( this . ffmpeg ) {
this . ffmpeg . kill ( 'SIGKILL' )
}
await fs . remove ( this . streamPath ) . then ( ( ) => {
Logger . info ( 'Deleted session data' , this . streamPath )
} ) . catch ( ( err ) => {
Logger . error ( 'Failed to delete session data' , err )
} )
2021-11-11 15:39:21 +01:00
if ( this . socket ) {
2021-11-13 22:24:56 +01:00
if ( errorMessage ) this . socket . emit ( 'stream_error' , { id : this . id , error : ( errorMessage || '' ) . trim ( ) } )
else this . socket . emit ( 'stream_closed' , this . id )
2021-11-11 15:39:21 +01:00
}
2021-08-18 00:01:11 +02:00
this . emit ( 'closed' )
}
cancelTranscode ( ) {
clearInterval ( this . loop )
if ( this . ffmpeg ) {
this . ffmpeg . kill ( 'SIGKILL' )
}
}
async waitCancelTranscode ( ) {
for ( let i = 0 ; i < 20 ; i ++ ) {
if ( ! this . ffmpeg ) return true
await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) )
}
Logger . error ( '[STREAM] Transcode never closed...' )
return false
}
async reset ( time ) {
if ( this . isResetting ) {
return Logger . info ( ` [STREAM] Stream ${ this . id } already resetting ` )
}
time = Math . max ( 0 , time )
this . isResetting = true
if ( this . ffmpeg ) {
this . cancelTranscode ( )
await this . waitCancelTranscode ( )
}
this . isTranscodeComplete = false
this . startTime = time
this . clientCurrentTime = this . startTime
Logger . info ( ` Stream Reset New Start Time ${ secondsToTimestamp ( this . startTime ) } ` )
this . start ( )
}
}
module . exports = Stream