2021-08-18 00:01:11 +02:00
const Ffmpeg = require ( 'fluent-ffmpeg' )
const EventEmitter = require ( 'events' )
const Path = require ( 'path' )
const fs = require ( 'fs-extra' )
2021-09-04 21:17:26 +02:00
const Logger = require ( '../Logger' )
2021-11-25 03:15:50 +01:00
const { getId , secondsToTimestamp } = require ( '../utils/index' )
2021-09-04 21:17:26 +02:00
const { writeConcatFile } = require ( '../utils/ffmpegHelpers' )
const hlsPlaylistGenerator = require ( '../utils/hlsPlaylistGenerator' )
2022-03-18 01:10:47 +01:00
const AudioTrack = require ( './files/AudioTrack' )
2021-11-06 02:24:02 +01:00
2021-08-18 00:01:11 +02:00
class Stream extends EventEmitter {
2022-03-18 01:10:47 +01:00
constructor ( sessionId , streamPath , user , libraryItem , mediaEntity , startTime , clientEmitter , transcodeOptions = { } ) {
2021-08-18 00:01:11 +02:00
super ( )
2022-03-18 01:10:47 +01:00
this . id = sessionId
this . user = user
2022-03-13 02:59:35 +01:00
this . libraryItem = libraryItem
2022-03-18 01:10:47 +01:00
this . mediaEntity = mediaEntity
this . clientEmitter = clientEmitter
2021-08-18 00:01:11 +02:00
2021-11-13 22:24:56 +01:00
this . transcodeOptions = transcodeOptions
2021-08-18 00:01:11 +02:00
this . segmentLength = 6
2021-10-26 03:38:09 +02:00
this . maxSeekBackTime = 30
2021-08-18 00:01:11 +02:00
this . streamPath = Path . join ( streamPath , this . id )
this . concatFilesPath = Path . join ( this . streamPath , 'files.txt' )
this . playlistPath = Path . join ( this . streamPath , 'output.m3u8' )
2021-09-04 21:17:26 +02:00
this . finalPlaylistPath = Path . join ( this . streamPath , 'final-output.m3u8' )
2022-03-18 01:10:47 +01:00
this . startTime = startTime
2021-08-18 00:01:11 +02:00
this . ffmpeg = null
this . loop = null
this . isResetting = false
this . isClientInitialized = false
this . isTranscodeComplete = false
this . segmentsCreated = new Set ( )
this . furthestSegmentCreated = 0
2022-03-18 01:10:47 +01:00
// this.clientCurrentTime = 0
2021-11-06 02:24:02 +01:00
2021-08-18 00:01:11 +02:00
this . init ( )
}
2022-03-13 02:59:35 +01:00
get libraryItemId ( ) {
return this . libraryItem . id
2021-08-18 00:01:11 +02:00
}
2022-03-18 01:10:47 +01:00
get mediaTitle ( ) {
return this . libraryItem . media . metadata . title || ''
}
get mediaEntityName ( ) {
return this . mediaEntity . name
}
2022-03-13 02:59:35 +01:00
get itemTitle ( ) {
2022-03-18 01:10:47 +01:00
return ` ${ this . mediaTitle } ( ${ this . mediaEntityName } ) `
2021-09-12 23:10:12 +02:00
}
2021-08-18 00:01:11 +02:00
get totalDuration ( ) {
2022-03-18 01:10:47 +01:00
return this . mediaEntity . duration
}
get tracks ( ) {
return this . mediaEntity . tracks
2021-08-18 00:01:11 +02:00
}
2021-10-02 20:50:39 +02:00
get tracksAudioFileType ( ) {
if ( ! this . tracks . length ) return null
2022-03-18 01:10:47 +01:00
return this . tracks [ 0 ] . metadata . format
}
get userToken ( ) {
return this . user . token
2021-10-02 20:50:39 +02:00
}
2022-02-07 00:31:04 +01:00
// Fmp4 does not work on iOS devices: https://github.com/advplyr/audiobookshelf-app/issues/85
// Workaround: Force AAC transcode for FLAC
2021-10-01 01:52:32 +02:00
get hlsSegmentType ( ) {
2022-02-07 00:31:04 +01:00
return 'mpegts'
2021-10-01 01:52:32 +02:00
}
get segmentBasename ( ) {
if ( this . hlsSegmentType === 'fmp4' ) return 'output-%d.m4s'
return 'output-%d.ts'
}
2021-08-18 00:01:11 +02:00
get segmentStartNumber ( ) {
if ( ! this . startTime ) return 0
2021-10-26 23:52:45 +02:00
return Math . floor ( Math . max ( this . startTime - this . maxSeekBackTime , 0 ) / this . segmentLength )
2021-08-18 00:01:11 +02:00
}
get numSegments ( ) {
var numSegs = Math . floor ( this . totalDuration / this . segmentLength )
if ( this . totalDuration - ( numSegs * this . segmentLength ) > 0 ) {
numSegs ++
}
return numSegs
}
get clientPlaylistUri ( ) {
return ` /hls/ ${ this . id } /output.m3u8 `
}
2022-03-18 01:10:47 +01:00
// get clientProgress() {
// if (!this.clientCurrentTime) return 0
// var prog = Math.min(1, this.clientCurrentTime / this.totalDuration)
// return Number(prog.toFixed(3))
// }
2021-11-13 22:24:56 +01:00
get isAACEncodable ( ) {
return [ 'mp4' , 'm4a' , 'm4b' ] . includes ( this . tracksAudioFileType )
}
get transcodeForceAAC ( ) {
return ! ! this . transcodeOptions . forceAAC
}
2021-08-18 00:01:11 +02:00
toJSON ( ) {
return {
id : this . id ,
2022-03-18 01:10:47 +01:00
userId : this . user . id ,
2022-03-13 02:59:35 +01:00
libraryItem : this . libraryItem . toJSONExpanded ( ) ,
2021-08-18 00:01:11 +02:00
segmentLength : this . segmentLength ,
playlistPath : this . playlistPath ,
clientPlaylistUri : this . clientPlaylistUri ,
2022-03-18 01:10:47 +01:00
// clientCurrentTime: this.clientCurrentTime,
2021-08-18 00:01:11 +02:00
startTime : this . startTime ,
2021-08-19 01:31:19 +02:00
segmentStartNumber : this . segmentStartNumber ,
2021-10-26 03:38:09 +02:00
isTranscodeComplete : this . isTranscodeComplete ,
2022-03-18 01:10:47 +01:00
// lastUpdate: this.clientUserAudiobookData ? this.clientUserAudiobookData.lastUpdate : 0
2021-08-18 00:01:11 +02:00
}
}
init ( ) {
2022-03-18 01:10:47 +01:00
// if (this.clientUserAudiobookData) {
// var timeRemaining = this.totalDuration - this.clientUserAudiobookData.currentTime
// Logger.info('[STREAM] User has progress for item', this.clientUserAudiobookData.progress, `Time Remaining: ${timeRemaining}s`)
// if (timeRemaining > 15) {
// this.startTime = this.clientUserAudiobookData.currentTime
// this.clientCurrentTime = this.startTime
// }
// }
2021-08-18 00:01:11 +02:00
}
async checkSegmentNumberRequest ( segNum ) {
var segStartTime = segNum * this . segmentLength
if ( this . startTime > segStartTime ) {
Logger . warn ( ` [STREAM] Segment # ${ segNum } Request @ ${ secondsToTimestamp ( segStartTime ) } is before start time ( ${ secondsToTimestamp ( this . startTime ) } ) - Reset Transcode ` )
await this . reset ( segStartTime - ( this . segmentLength * 2 ) )
return segStartTime
} else if ( this . isTranscodeComplete ) {
return false
}
var distanceFromFurthestSegment = segNum - this . furthestSegmentCreated
if ( distanceFromFurthestSegment > 10 ) {
Logger . info ( ` Segment # ${ segNum } requested is ${ distanceFromFurthestSegment } segments from latest ( ${ secondsToTimestamp ( segStartTime ) } ) - Reset Transcode ` )
await this . reset ( segStartTime - ( this . segmentLength * 2 ) )
return segStartTime
}
return false
}
async generatePlaylist ( ) {
fs . ensureDirSync ( this . streamPath )
2021-11-17 00:37:49 +01:00
await hlsPlaylistGenerator ( this . playlistPath , 'output' , this . totalDuration , this . segmentLength , this . hlsSegmentType , this . userToken )
2021-08-18 00:01:11 +02:00
return this . clientPlaylistUri
}
async checkFiles ( ) {
try {
var files = await fs . readdir ( this . streamPath )
files . forEach ( ( file ) => {
var extname = Path . extname ( file )
2021-10-01 01:52:32 +02:00
if ( extname === '.ts' || extname === '.m4s' ) {
2021-08-18 00:01:11 +02:00
var basename = Path . basename ( file , extname )
var num _part = basename . split ( '-' ) [ 1 ]
var part _num = Number ( num _part )
this . segmentsCreated . add ( part _num )
}
} )
if ( ! this . segmentsCreated . size ) {
Logger . warn ( 'No Segments' )
return
}
if ( this . segmentsCreated . size > 6 && ! this . isClientInitialized ) {
this . isClientInitialized = true
2022-03-18 01:10:47 +01:00
Logger . info ( ` [STREAM] ${ this . id } notifying client that stream is ready ` )
this . clientEmit ( 'stream_open' , this . toJSON ( ) )
2021-08-18 00:01:11 +02:00
}
var chunks = [ ]
var current _chunk = [ ]
var last _seg _in _chunk = - 1
var segments = Array . from ( this . segmentsCreated ) . sort ( ( a , b ) => a - b ) ;
var lastSegment = segments [ segments . length - 1 ]
if ( lastSegment > this . furthestSegmentCreated ) {
this . furthestSegmentCreated = lastSegment
}
segments . forEach ( ( seg ) => {
if ( ! current _chunk . length || last _seg _in _chunk + 1 === seg ) {
last _seg _in _chunk = seg
current _chunk . push ( seg )
} else {
if ( current _chunk . length === 1 ) chunks . push ( current _chunk [ 0 ] )
else chunks . push ( ` ${ current _chunk [ 0 ] } - ${ current _chunk [ current _chunk . length - 1 ] } ` )
last _seg _in _chunk = seg
current _chunk = [ seg ]
}
} )
if ( current _chunk . length ) {
if ( current _chunk . length === 1 ) chunks . push ( current _chunk [ 0 ] )
else chunks . push ( ` ${ current _chunk [ 0 ] } - ${ current _chunk [ current _chunk . length - 1 ] } ` )
}
var perc = ( this . segmentsCreated . size * 100 / this . numSegments ) . toFixed ( 2 ) + '%'
Logger . info ( '[STREAM-CHECK] Check Files' , this . segmentsCreated . size , 'of' , this . numSegments , perc , ` Furthest Segment: ${ this . furthestSegmentCreated } ` )
2021-11-11 15:39:21 +01:00
// Logger.debug('[STREAM-CHECK] Chunks', chunks.join(', '))
2022-03-18 01:10:47 +01:00
this . clientEmit ( 'stream_progress' , {
stream : this . id ,
percent : perc ,
chunks ,
numSegments : this . numSegments
} )
2021-08-18 00:01:11 +02:00
} catch ( error ) {
2021-09-22 03:57:33 +02:00
Logger . error ( 'Failed checking files' , error )
2021-08-18 00:01:11 +02:00
}
}
startLoop ( ) {
2022-03-18 01:10:47 +01:00
this . clientEmit ( 'stream_progress' , { stream : this . id , chunks : [ ] , numSegments : 0 , percent : '0%' } )
2021-09-13 01:22:52 +02:00
clearInterval ( this . loop )
var intervalId = setInterval ( ( ) => {
2021-08-18 00:01:11 +02:00
if ( ! this . isTranscodeComplete ) {
this . checkFiles ( )
} else {
2022-03-18 01:10:47 +01:00
Logger . info ( ` [Stream] ${ this . itemTitle } sending stream_ready ` )
this . clientEmit ( 'stream_ready' )
2021-09-13 01:22:52 +02:00
clearInterval ( intervalId )
2021-08-18 00:01:11 +02:00
}
} , 2000 )
2021-09-13 01:22:52 +02:00
this . loop = intervalId
2021-08-18 00:01:11 +02:00
}
async start ( ) {
Logger . info ( ` [STREAM] START STREAM - Num Segments: ${ this . numSegments } ` )
this . ffmpeg = Ffmpeg ( )
2021-10-26 23:52:45 +02:00
var adjustedStartTime = Math . max ( this . startTime - this . maxSeekBackTime , 0 )
var trackStartTime = await writeConcatFile ( this . tracks , this . concatFilesPath , adjustedStartTime )
2021-08-18 00:01:11 +02:00
this . ffmpeg . addInput ( this . concatFilesPath )
2021-10-24 18:32:52 +02:00
// seek_timestamp : https://ffmpeg.org/ffmpeg.html
// the argument to the -ss option is considered an actual timestamp, and is not offset by the start time of the file
2021-10-26 23:52:45 +02:00
// fixes https://github.com/advplyr/audiobookshelf/issues/116
2021-10-24 18:32:52 +02:00
this . ffmpeg . inputOption ( '-seek_timestamp 1' )
2021-08-18 00:01:11 +02:00
this . ffmpeg . inputFormat ( 'concat' )
this . ffmpeg . inputOption ( '-safe 0' )
2021-10-26 23:52:45 +02:00
if ( adjustedStartTime > 0 ) {
const shiftedStartTime = adjustedStartTime - trackStartTime
2021-10-24 18:32:52 +02:00
// Issues using exact fractional seconds i.e. 29.49814 - changing to 29.5s
var startTimeS = Math . round ( shiftedStartTime * 10 ) / 10 + 's'
2021-10-26 23:52:45 +02:00
Logger . info ( ` [STREAM] Starting Stream at startTime ${ secondsToTimestamp ( adjustedStartTime ) } (User startTime ${ secondsToTimestamp ( this . startTime ) } ) and Segment # ${ this . segmentStartNumber } ` )
2021-10-24 18:32:52 +02:00
this . ffmpeg . inputOption ( ` -ss ${ startTimeS } ` )
2021-08-18 00:01:11 +02:00
this . ffmpeg . inputOption ( '-noaccurate_seek' )
}
2021-11-13 22:24:56 +01:00
const logLevel = process . env . NODE _ENV === 'production' ? 'error' : 'warning'
2022-02-07 00:31:04 +01:00
const audioCodec = ( this . tracksAudioFileType === 'flac' || this . tracksAudioFileType === 'opus' || this . transcodeForceAAC ) ? 'aac' : 'copy'
2021-08-18 00:01:11 +02:00
this . ffmpeg . addOption ( [
2021-09-08 16:15:54 +02:00
` -loglevel ${ logLevel } ` ,
2021-08-18 00:01:11 +02:00
'-map 0:a' ,
2021-10-01 01:52:32 +02:00
` -c:a ${ audioCodec } `
2021-08-18 00:01:11 +02:00
] )
2021-10-01 01:52:32 +02:00
const hlsOptions = [
2021-08-18 00:01:11 +02:00
'-f hls' ,
"-copyts" ,
2021-10-23 20:42:07 +02:00
"-avoid_negative_ts make_non_negative" ,
2021-08-18 00:01:11 +02:00
"-max_delay 5000000" ,
"-max_muxing_queue_size 2048" ,
` -hls_time 6 ` ,
2021-10-01 01:52:32 +02:00
` -hls_segment_type ${ this . hlsSegmentType } ` ,
2021-08-18 00:01:11 +02:00
` -start_number ${ this . segmentStartNumber } ` ,
"-hls_playlist_type vod" ,
"-hls_list_size 0" ,
"-hls_allow_cache 0"
2021-10-01 01:52:32 +02:00
]
if ( this . hlsSegmentType === 'fmp4' ) {
hlsOptions . push ( '-strict -2' )
2022-02-07 00:31:04 +01:00
var fmp4InitFilename = Path . join ( this . streamPath , 'init.mp4' )
// var fmp4InitFilename = 'init.mp4'
2021-10-01 01:52:32 +02:00
hlsOptions . push ( ` -hls_fmp4_init_filename ${ fmp4InitFilename } ` )
}
this . ffmpeg . addOption ( hlsOptions )
2021-08-18 00:01:11 +02:00
var segmentFilename = Path . join ( this . streamPath , this . segmentBasename )
this . ffmpeg . addOption ( ` -hls_segment_filename ${ segmentFilename } ` )
2021-09-04 21:17:26 +02:00
this . ffmpeg . output ( this . finalPlaylistPath )
2021-08-18 00:01:11 +02:00
this . ffmpeg . on ( 'start' , ( command ) => {
Logger . info ( '[INFO] FFMPEG transcoding started with command: ' + command )
2021-09-13 01:22:52 +02:00
Logger . info ( '' )
2021-08-18 00:01:11 +02:00
if ( this . isResetting ) {
2021-11-13 22:24:56 +01:00
// AAC encode is much slower
const clearIsResettingTime = this . transcodeForceAAC ? 3000 : 500
2021-08-18 00:01:11 +02:00
setTimeout ( ( ) => {
Logger . info ( '[STREAM] Clearing isResetting' )
this . isResetting = false
2021-09-13 01:22:52 +02:00
this . startLoop ( )
2021-11-13 22:24:56 +01:00
} , clearIsResettingTime )
2021-09-13 01:22:52 +02:00
} else {
this . startLoop ( )
2021-08-18 00:01:11 +02:00
}
} )
this . ffmpeg . on ( 'stderr' , ( stdErrline ) => {
Logger . info ( stdErrline )
} )
this . ffmpeg . on ( 'error' , ( err , stdout , stderr ) => {
if ( err . message && err . message . includes ( 'SIGKILL' ) ) {
// This is an intentional SIGKILL
Logger . info ( '[FFMPEG] Transcode Killed' )
this . ffmpeg = null
2021-11-13 22:24:56 +01:00
clearInterval ( this . loop )
2021-08-18 00:01:11 +02:00
} else {
2021-11-13 22:24:56 +01:00
Logger . error ( 'Ffmpeg Err' , '"' + err . message + '"' )
// Temporary workaround for https://github.com/advplyr/audiobookshelf/issues/172
const aacErrorMsg = 'ffmpeg exited with code 1: Could not write header for output file #0 (incorrect codec parameters ?)'
if ( audioCodec === 'copy' && this . isAACEncodable && err . message && err . message . startsWith ( aacErrorMsg ) ) {
Logger . info ( ` [Stream] Re-attempting stream with AAC encode ` )
this . transcodeOptions . forceAAC = true
this . reset ( this . startTime )
} else {
// Close stream show error
this . close ( err . message )
}
2021-08-18 00:01:11 +02:00
}
} )
this . ffmpeg . on ( 'end' , ( stdout , stderr ) => {
Logger . info ( '[FFMPEG] Transcoding ended' )
2021-08-21 01:29:10 +02:00
// For very small fast load
if ( ! this . isClientInitialized ) {
this . isClientInitialized = true
2022-03-18 01:10:47 +01:00
Logger . info ( ` [STREAM] ${ this . id } notifying client that stream is ready ` )
this . clientEmit ( 'stream_open' , this . toJSON ( ) )
2021-08-18 00:01:11 +02:00
}
this . isTranscodeComplete = true
this . ffmpeg = null
2021-09-18 01:40:30 +02:00
clearInterval ( this . loop )
2021-08-18 00:01:11 +02:00
} )
this . ffmpeg . run ( )
}
2021-11-13 22:24:56 +01:00
async close ( errorMessage = null ) {
2021-08-18 00:01:11 +02:00
clearInterval ( this . loop )
Logger . info ( 'Closing Stream' , this . id )
if ( this . ffmpeg ) {
this . ffmpeg . kill ( 'SIGKILL' )
}
await fs . remove ( this . streamPath ) . then ( ( ) => {
Logger . info ( 'Deleted session data' , this . streamPath )
} ) . catch ( ( err ) => {
Logger . error ( 'Failed to delete session data' , err )
} )
2022-03-18 01:10:47 +01:00
if ( errorMessage ) this . clientEmit ( 'stream_error' , { id : this . id , error : ( errorMessage || '' ) . trim ( ) } )
else this . clientEmit ( 'stream_closed' , this . id )
2021-08-18 00:01:11 +02:00
this . emit ( 'closed' )
}
cancelTranscode ( ) {
clearInterval ( this . loop )
if ( this . ffmpeg ) {
this . ffmpeg . kill ( 'SIGKILL' )
}
}
async waitCancelTranscode ( ) {
for ( let i = 0 ; i < 20 ; i ++ ) {
if ( ! this . ffmpeg ) return true
await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) )
}
Logger . error ( '[STREAM] Transcode never closed...' )
return false
}
async reset ( time ) {
if ( this . isResetting ) {
return Logger . info ( ` [STREAM] Stream ${ this . id } already resetting ` )
}
time = Math . max ( 0 , time )
this . isResetting = true
if ( this . ffmpeg ) {
this . cancelTranscode ( )
await this . waitCancelTranscode ( )
}
this . isTranscodeComplete = false
this . startTime = time
2022-03-18 01:10:47 +01:00
// this.clientCurrentTime = this.startTime
2021-08-18 00:01:11 +02:00
Logger . info ( ` Stream Reset New Start Time ${ secondsToTimestamp ( this . startTime ) } ` )
this . start ( )
}
2022-03-18 01:10:47 +01:00
clientEmit ( evtName , data ) {
if ( this . clientEmitter ) this . clientEmitter ( this . user . id , evtName , data )
}
getAudioTrack ( ) {
var newAudioTrack = new AudioTrack ( )
newAudioTrack . setFromStream ( this . itemTitle , this . totalDuration , this . clientPlaylistUri )
return newAudioTrack
}
2021-08-18 00:01:11 +02:00
}
module . exports = Stream