lock/unlock
You could try adding a "lock" state to the Deployer
-
// Deployer.js
class Deployer {
protected txDataList: Array<TXData>
protected isLocked: Boolean // lock state
constructor() {
this.txDataList = []
this.isLocked = false
setInterval(_ => this.read(), 5000)
}
async read() {
if (this.isLocked) return // if locked, exit
this.isLocked = true // lock
let tx
while (true) {
tx = this.txDataList.shift()// fifo tx
if (tx == null) break // if no tx, stop processing
await doSomething(tx) // process each tx
}
this.isLocked = false // unlock
}
write(tx: TXData) {
this.txDataList.push(tx)
}
}
export default Deployer
// some-service.js
import Deployer from "./Deployer.js"
const deployer = new Deployer()
someService.on("txData", tx => deployer.write(tx))
So we can write
to the deployer whenever, but read
only functions when the deployer is in an "unlocked" state.
buffer
Maybe Buffer
is a better name because it describes the behavior of the class. TXData
can also be abstracted so that we could use our Buffer for any data. And let's make interval a parameter -
// Buffer.js
class Buffer<T> { // generic T
private buffer: Array<T> // generic T
private isLocked: Boolean
constructor({ interval = 5 }) { // make interval a parameter
this.buffer = []
this.isLocked = false
setInterval(_ => this.read(), interval * 1000)
}
async read() {
if (this.isLocked) return
this.isLocked = true
let t
while (true) {
t = this.buffer.shift()
if (t == null) break
await doSomething(t)
}
this.isLocked = false
}
write(t: T) { // generic T
this.buffer.push(t)
}
}
Now specify the type for Buffer<T>
as Buffer<TXData>
. Optionally set an interval
when constructing the Buffer -
// some-service.js
import Buffer from "./Buffer.js"
const buffer = new Buffer<TXData>({ iterval: 10 })
someService.on("txData", tx => buffer.write(tx))
move effect to call site
Next we should get rid of that hard-coded doSomething
. Make onRead
a parameter so the caller can decide to do with each item -
// Buffer.js
class Buffer<T> {
private buffer: Array<T>
private isLocked: Boolean
private onRead: (x: T) => void // onRead
constructor({ interval = 5, onRead = console.log }) { // onRead
this.buffer = []
this.isLocked = false
setInterval(_ => this.read(), interval * 1000)
}
async read() {
if (this.isLocked) return
this.isLocked = true
let t
while (true) {
t = this.buffer.shift()
if (t == null) break
await this.onRead(t) // onRead
}
this.isLocked = false
}
write(t: T) {
this.buffer.push(t)
}
}
Caller decides the doSomething
effect -
// some-service.js
import Buffer from "./Buffer.js"
async function doSomething(tx: TXData) {
await // process tx ...
}
const buffer = new Buffer<TXData>({
interval: 10,
onRead: doSomething // specify onRead
})
someService.on("txData", tx => buffer.write(tx))
related
See this Q&A for a related channel
abstraction using async generators.