@@ -136,55 +136,49 @@ def update_cached(subfolder=None):
136136 print ("Added %s contenders for processing queue." % count )
137137
138138
139- def start_queue (subfolder = None , max_count = None , to_storage = True ):
139+ def start_queue (subfolder = None , max_count = None ):
140140 '''
141141 start queue will be used to move new Batches (jobs) from the QUEUE to be
142142 run with celery tasks. The status is changed from QUEUE to NEW when this is done.
143143 If the QUEUE is empty, we parse the filesystem (and queue new jobs) again.
144144 This job submission is done all at once to ensure that we don't have race
145145 conditions of multiple workers trying to grab a job at the same time.
146146 '''
147- from sendit .apps .main .tasks import import_dicomdir , upload_storage
147+ from sendit .apps .main .tasks import import_dicomdir
148148
149149 contenders = Batch .objects .filter (status = "QUEUE" )
150150 if len (contenders ) == 0 :
151151 update_cached (subfolder )
152152 contenders = Batch .objects .filter (status = "QUEUE" )
153153
154154 started = 0
155- batch_ids = []
156155 for batch in contenders :
157-
158156 # not seen folders in queue
159157 dicom_dir = batch .logs .get ('DICOM_DIR' )
160158 if dicom_dir is not None :
161159 import_dicomdir .apply_async (kwargs = {"dicom_dir" :dicom_dir })
162-
163- # Run an upload batch every 1000
164- if to_storage is True :
165- if len (batch_ids ) % 1000 == 0 :
166- upload_storage .apply_async (kwargs = {"batch_ids" : batch_ids })
167- batch_ids = []
168- batch_ids .append (batch .id )
169160 started += 1
170161 if max_count is not None :
171162 if started >= max_count :
172163 break
173164
174- # Upload remaining
175- if to_storage is True :
176- upload_storage .apply_async (kwargs = {"batch_ids" : batch_ids })
177- upload_storage .apply_async () # any remaining
178165 print ("Added %s tasks to the active queue." % started )
179166
180167
181- def upload_finished ():
168+ def upload_finished (batches = False , chunk_size = 1000 ):
182169 '''upload finished will upload datasets with status DONEPROCESSING
183170 to google storage. We do this with one worker to reduce the number
184171 of concurrent API calls. In the future, this will be better optimized.
185172 '''
186173 from sendit .apps .main .tasks import upload_storage
187- upload_storage .apply_async ()
174+ from sendit .apps .main .tasks .utils import chunks
175+
176+ if batches is False :
177+ upload_storage .apply_async ()
178+ else :
179+ batch_ids = [b .id for b in Batch .objects .filter (status = "DONEPROCESSING" )]
180+ for subset in chunks (batch_ids , chunk_size ):
181+ upload_storage .apply_async (kwargs = {"batch_ids" : subset })
188182
189183
190184def get_contenders (base ,current = None , filters = None ):
0 commit comments