@@ -24,22 +24,24 @@ const logJsonFeederTag = "JS-FEEDER"
2424
2525// jsonFeeder generates a stream of user requests reading from a json file.
2626type jsonFeeder struct {
27- collector * metrics.Collector // Metrics collector that collects system level metrics.
28- reqInjectionNode sync.Map // Map of ContainerID<->NodeIndex.
29- systemTotalResources types.Resources // Caravela's maximum resources.
30- randomGenerator * rand.Rand // Pseudo-random generator.
31- simConfigs * configuration.Configuration // Simulator's configurations.
32- caravelaConfigs * caravelaConfigs.Configuration // Caravela's configurations.
27+ collector * metrics.Collector // Metrics collector that collects system level metrics.
28+ containerInjectionNode sync.Map // Map of ContainerID<->NodeIndex.
29+ currentRequests sync.Map // Map of RequestID<->ContainerID.
30+ systemTotalResources types.Resources // Caravela's maximum resources.
31+ randomGenerator * rand.Rand // Pseudo-random generator.
32+ simConfigs * configuration.Configuration // Simulator's configurations.
33+ caravelaConfigs * caravelaConfigs.Configuration // Caravela's configurations.
3334}
3435
3536// newJsonFeeder creates a new json feeder.
3637func newJsonFeeder (simConfigs * configuration.Configuration , caravelaConfigs * caravelaConfigs.Configuration , rngSeed int64 ) (Feeder , error ) {
3738return & jsonFeeder {
38- collector : nil ,
39- reqInjectionNode : sync.Map {},
40- randomGenerator : rand .New (caravelaUtil .NewSourceSafe (rand .NewSource (rngSeed ))),
41- simConfigs : simConfigs ,
42- caravelaConfigs : caravelaConfigs ,
39+ collector : nil ,
40+ containerInjectionNode : sync.Map {},
41+ currentRequests : sync.Map {},
42+ randomGenerator : rand .New (caravelaUtil .NewSourceSafe (rand .NewSource (rngSeed ))),
43+ simConfigs : simConfigs ,
44+ caravelaConfigs : caravelaConfigs ,
4345}, nil
4446}
4547
@@ -57,18 +59,11 @@ func (j *jsonFeeder) Start(ticksChannel <-chan chan RequestTask) {
5759CPUs float64 `json:"CPU request"`
5860Memory float64 `json:"memory request"`
5961}
62+ const maxJsonFiles = 18
63+ jsonFileCounter := 0
6064tick := 0
6165
62- fileReader , err := os .Open ("RequestStream.js" )
63- if err != nil {
64- panic (fmt .Errorf ("json feeder invalid request stream file: %s" , err ))
65- }
66-
67- jsonRequestStream := json .NewDecoder (fileReader )
68- _ , err = jsonRequestStream .Token () // Read open bracket
69- if err != nil {
70- panic (err )
71- }
66+ jsonRequestStream , currentJsonFile := j .getJsonRequestStream (fmt .Sprintf ("in/Stream_%d.js" , jsonFileCounter ))
7267
7368for {
7469select {
@@ -87,67 +82,104 @@ func (j *jsonFeeder) Start(ticksChannel <-chan chan RequestTask) {
8782
8883requestID := strconv .FormatInt (reqJson .JobID , 10 )
8984reqResources := j .generateRequestResources (reqJson .CPUs , reqJson .Memory )
90- tickCpusAcc += reqResources .CPUs
91- tickMemoryAcc += reqResources .Memory
9285
9386if reqJson .EventType == 1 && ! j .requestExists (requestID ) { // Deploy container request.
9487
88+ tickCpusAcc += reqResources .CPUs
89+ tickMemoryAcc += reqResources .Memory
90+ j .currentRequests .Store (requestID , nil )
91+
9592newTickChan <- func (nodeIndex int , injectedNode * node.Node , currentTime time.Duration ) {
96- requestCtx := context .WithValue (context .Background (), types .RequestIDKey , requestID )
97- j .collector .CreateRunRequest (nodeIndex , requestID , reqResources , currentTime )
98- contStatus , err := injectedNode .SubmitContainers (
99- requestCtx ,
100- []types.ContainerConfig {{
101- ImageKey : util .RandomName (),
102- Name : util .RandomName (),
103- PortMappings : caravela .EmptyPortMappings (),
104- Args : caravela .EmptyContainerArgs (),
105- Resources : reqResources ,
106- GroupPolicy : types .SpreadGroupPolicy ,
107- }})
108- if err == nil {
109- j .reqInjectionNode .Store (contStatus [0 ].ContainerID , injectedNode )
110- j .collector .RunRequestSucceeded ()
93+ if _ , exist := j .currentRequests .Load (requestID ); ! exist {
94+ requestCtx := context .WithValue (context .Background (), types .RequestIDKey , requestID )
95+ j .collector .CreateRunRequest (nodeIndex , requestID , reqResources , currentTime )
96+ contStatus , err := injectedNode .SubmitContainers (
97+ requestCtx ,
98+ []types.ContainerConfig {{
99+ ImageKey : util .RandomName (),
100+ Name : util .RandomName (),
101+ PortMappings : caravela .EmptyPortMappings (),
102+ Args : caravela .EmptyContainerArgs (),
103+ Resources : reqResources ,
104+ GroupPolicy : types .SpreadGroupPolicy ,
105+ }})
106+ if err == nil {
107+ j .containerInjectionNode .Store (contStatus [0 ].ContainerID , injectedNode )
108+ j .currentRequests .Store (requestID , contStatus [0 ].ContainerID )
109+ j .collector .RunRequestSucceeded ()
110+ }
111+ j .collector .ArchiveRunRequest (requestID )
111112}
112- j .collector .ArchiveRunRequest (requestID )
113113}
114114
115115} else if (reqJson .EventType == 2 || reqJson .EventType == 3 || reqJson .EventType == 4 ||
116116reqJson .EventType == 5 || reqJson .EventType == 6 ) && j .requestExists (requestID ) { // Stop container request.
117117
118- j .reqInjectionNode .Range (func (key , value interface {}) bool {
119- containerID , _ := key .(string )
120- injectionNode , _ := value .(* node.Node )
121- newTickChan <- func (_ int , _ * node.Node , _ time.Duration ) {
122- err := injectionNode .StopContainers (context .Background (), []string {containerID })
123- if err != nil {
124- //util.Log.Infof(util.LogTag(logRandFeederTag)+"Stop container FAILED, err: %s", err)
125- }
126- j .reqInjectionNode .Delete (containerID )
118+ contID , exist := j .currentRequests .Load (requestID )
119+ containerID , ok := contID .(string )
120+ if ! exist || ! ok {
121+ j .currentRequests .Delete (requestID )
122+ continue
123+ }
124+
125+ injNode , exist := j .containerInjectionNode .Load (containerID )
126+ if ! exist {
127+ continue
128+ //panic(fmt.Errorf("no mapping between request and container, %s", containerID))
129+ }
130+ injectionNode , ok := injNode .(* node.Node )
131+ if ! ok {
132+ panic ("node to *node.Node" )
133+ }
134+
135+ newTickChan <- func (_ int , _ * node.Node , _ time.Duration ) {
136+ err := injectionNode .StopContainers (context .Background (), []string {containerID })
137+ if err != nil {
138+ //util.Log.Infof(util.LogTag(logRandFeederTag)+"Stop container FAILED, err: %s", err)
127139}
128- return false
129- })
140+ j .currentRequests .Delete (requestID )
141+ j .containerInjectionNode .Delete (containerID )
142+ }
130143
131144}
132145
133146}
134147
135- if ! jsonRequestStream .More () {
136- fileReader .Close () // Close the request file.
137- close (newTickChan ) // No more user requests for this tick.
148+ if ! jsonRequestStream .More () && (jsonFileCounter == maxJsonFiles ) {
149+ fmt .Println ("ASDDDDDDDDDDDDDDDDDDDDDDDD" )
150+ currentJsonFile .Close () // Close the request file.
151+ close (newTickChan ) // No more user requests for this tick.
138152return
153+ } else if ! jsonRequestStream .More () {
154+ jsonFileCounter ++
155+ currentJsonFile .Close () // Close the request file.
156+ jsonRequestStream , currentJsonFile = j .getJsonRequestStream (fmt .Sprintf ("in/Stream_%d.js" , jsonFileCounter ))
139157}
140158
141159close (newTickChan ) // No more user requests for this tick.
142160} else { // Simulator closed ticks channel.
143- fileReader .Close () // Close the request file.
144- return // Stop feeding engine
161+ currentJsonFile .Close () // Close the request file.
162+ return // Stop feeding engine
145163}
146164}
147165tick ++
148166}
149167}
150168
169+ func (j * jsonFeeder ) getJsonRequestStream (filePath string ) (* json.Decoder , * os.File ) {
170+ fileReader , err := os .Open (filePath )
171+ if err != nil {
172+ panic (fmt .Errorf ("json feeder invalid request stream file: %s" , err ))
173+ }
174+
175+ jsonRequestStream := json .NewDecoder (fileReader )
176+ _ , err = jsonRequestStream .Token () // Read open bracket
177+ if err != nil {
178+ panic (err )
179+ }
180+ return jsonRequestStream , fileReader
181+ }
182+
151183// generateRequestResources ...
152184func (j * jsonFeeder ) generateRequestResources (normalizedCpus , normalizedMemory float64 ) types.Resources {
153185cpuClasses := make ([]int , len (j .caravelaConfigs .ResourcesPartitions ().CPUClasses ))
@@ -195,6 +227,6 @@ func (j *jsonFeeder) ratioSystemResources(cpus, memory int) float64 {
195227
196228// requestExists verifies if a request was already injected in the request stream.
197229func (j * jsonFeeder ) requestExists (requestID string ) bool {
198- _ , ok := j .reqInjectionNode .Load (requestID )
199- return ok
230+ _ , exist := j .currentRequests .Load (requestID )
231+ return exist
200232}
0 commit comments