@@ -279,49 +279,47 @@ void WriteBufferProxy::write_loop(void)
279279
280280using namespace std ::chrono;
281281
282- // Keep distinct clocks for atoms and values.
283- // That's because the first writer delays the second writer
284- steady_clock::time_point atostart = steady_clock::now ();
285- steady_clock::time_point valstart = atostart;
286-
287282_ticker = 0.25 * _decay;
288283if (10.0 < _ticker) _ticker = 10.0 ;
289284
290- // Set a minimum write value, at half of this .
291- double minfrac = _ticker / _decay;
285+ // Set amount to write per cycle .
286+ double frac = _ticker / _decay;
292287
293288// After opening, sleep for a little while
294289uint nappy = 1 + ceil (1000.0 * _ticker);
290+ std::this_thread::sleep_for (milliseconds (nappy));
291+
292+ // Start with non-zero moving avg, approximating what it should be.
293+ _mavg_in_atoms = _astore;
294+ _mavg_in_values = _vstore;
295+ _mavg_buf_atoms = (double ) _atom_queue.size ();
296+ _mavg_buf_values = (double ) _value_queue.size ();
297+ _mavg_out_atoms = frac * _mavg_buf_atoms;
298+ _mavg_out_values = frac * _mavg_buf_values;
295299
296- while (not _stop)
300+ while (not _stop)
297301{
298- if ( 0 < nappy) std::this_thread::sleep_for ( milliseconds (nappy) );
302+ steady_clock::time_point awake = steady_clock::now ( );
299303
300304bool wrote = false ;
301- steady_clock::time_point awake = steady_clock::now ();
302305if (not _atom_queue.is_empty ())
303306{
304307wrote = true ;
305308
306- // How long have we slept, in seconds?
307- double waited = duration_cast<duration<double >>(awake-atostart).count ();
308- // What fraction of the decay time is that?
309- double frac = waited / _decay;
310-
311- // How many Atoms awaiting to be written?
309+ // How many Atoms waiting to be written?
312310double qsz = (double ) _atom_queue.size ();
313311
312+ // How many should we write?
313+ uint nwrite = ceil (frac * qsz);
314+
314315// Moving average of the last ten writes. Is that OK?
315316#define WEI 0.1
316317_mavg_buf_atoms = (1.0 -WEI) * _mavg_buf_atoms + WEI * qsz;
317318
318- // How many should we write?
319- uint nwrite = ceil (frac * qsz);
320-
321319// Whats the min to write? The goal here is to not
322320// dribble out the tail, but to push it out, if its
323321// almost all gone anyway.
324- uint mwr = ceil (0.5 * minfrac * _mavg_buf_atoms);
322+ uint mwr = ceil (0.5 * frac * _mavg_buf_atoms);
325323if (mwr < 1000 ) mwr = 1000 ;
326324if (nwrite < mwr) nwrite = mwr;
327325
@@ -335,30 +333,23 @@ void WriteBufferProxy::write_loop(void)
335333_astore = 0 ;
336334_mavg_out_atoms = (1.0 -WEI) * _mavg_out_atoms + WEI * avec.size ();
337335}
338- atostart = awake;
339-
340- // Re-measure, because above may have taken a long time.
341- steady_clock::time_point vwake = steady_clock::now ();
342336
343337// Cut-n-paste of above.
344338if (not _value_queue.is_empty ())
345339{
346340wrote = true ;
347341
348- // How long have we slept, in seconds?
349- double waited = duration_cast<duration<double >>(vwake-valstart).count ();
350- // What fraction of the decay time is that?
351- double frac = waited / _decay;
352-
353342// How many values are waiting to be written?
354343double qsz = (double ) _value_queue.size ();
355- _mavg_buf_values = (1.0 -WEI) * _mavg_buf_values + WEI * qsz;
356344
357345// How many should we write?
358346uint nwrite = ceil (frac * qsz);
359347
348+ // Moving avg
349+ _mavg_buf_values = (1.0 -WEI) * _mavg_buf_values + WEI * qsz;
350+
360351// Min to write.
361- uint mwr = ceil (0.5 * minfrac * _mavg_buf_values);
352+ uint mwr = ceil (0.5 * frac * _mavg_buf_values);
362353if (nwrite < mwr) nwrite = mwr;
363354
364355// Store that many
@@ -372,13 +363,17 @@ void WriteBufferProxy::write_loop(void)
372363_vstore = 0 ;
373364_mavg_out_values = (1.0 -WEI) * _mavg_out_values + WEI * vav.size ();
374365}
375- valstart = vwake ;
366+ if (wrote) _ndumps ++ ;
376367
377368// How much time have we used up so far?
378- steady_clock::time_point elap = steady_clock::now ();
379- double used = duration_cast<duration<double >>(elap-awake).count ();
369+ steady_clock::time_point wrdone = steady_clock::now ();
370+ double wrtime = duration_cast<duration<double >>(wrdone-awake).count ();
371+
372+ // Moving averge duty factor.
373+ _mavg_load = (1.0 -WEI) * _mavg_load + WEI * wrtime / _ticker;
374+
380375// How much time do we have left to sleep?
381- double left = _ticker - used ;
376+ double left = _ticker - wrtime ;
382377if (0.0 < left)
383378{
384379nappy = floor (1000.0 * left);
@@ -389,6 +384,7 @@ void WriteBufferProxy::write_loop(void)
389384_high_water_mark *= 17 ;
390385_high_water_mark /= 16 ;
391386}
387+ std::this_thread::sleep_for (milliseconds (nappy));
392388}
393389else
394390{
@@ -414,12 +410,9 @@ void WriteBufferProxy::write_loop(void)
414410left = 0.0 ;
415411double worst = fmax (_mavg_out_atoms, _mavg_out_values);
416412#define DUTY_CYCLE 1.2
417- _high_water_mark = DUTY_CYCLE * worst * _decay / used ;
413+ _high_water_mark = DUTY_CYCLE * worst * _decay / wrtime ;
418414_nstalls ++;
419415}
420-
421- if (wrote) _ndumps ++;
422- _mavg_load = (1.0 -WEI) * _mavg_load + WEI * used / _ticker;
423416}
424417}
425418
0 commit comments