Pipe is an Android library for building pipelines for executing background tasks.
- Declarative: Uses a Kotlin DSL or builder to declare pipeline schematics.
- Powerful: Supports both simple steps and complex synchronization structures like barriers and aggregators.
- Resilient: Pipe has resiliency baked-in. Has support for step retries in case of failures.
- Android arch-friendly: The API is designed with the Android architecture components in mind, making integration into your apps easy.
Consider a use case where we want to construct the following pipeline:
- Wait for a UI signal to start the pipeline (say, button click)
- Ensure that there is internet connectivity
- Download an image from the given URL
- Rotate the downloaded image by 90 degrees
- Scale the image to a 400px x 400px size
- Overdraw the scaled image on top if its sibling
Pipe uses a Kotlin DSL to declare pipeline schematics. We can express the above pipeline as:
data class ImagePipelineMember(val url: String? = null, val image: Bitmap? = null) val JOBS_REPO = InMemoryRepository<Job<ImagePipelineMember>>() val LOGGER = AndroidLogger("Pipe") fun makePipeline() = buildPipeline<ImagePipelineMember> { setRepository(JOBS_REPO) setLogger(LOGGER) addManualBarrier("start_barrier") addManualBarrier("internet_barrier") addStep("download", attempts = 4) { ImagePipelineMember(image = downloadImage(it.url!!)) } addStep("rotate") { ImagePipelineMember(image = rotateBitmap(it.image!!, 90.0f)) } addStep("scale") { ImagePipelineMember(image = scale(it.image!!, 400, 400)) } addCountedBarrier("overdraw", capacity = Int.MAX_VALUE) { allMembers -> allMembers.mapIndexed { index, member -> val siblingIndex = if (index == 0) allMembers.lastIndex else index - 1 val resultingImage = overdraw(member.image!!, allMembers[siblingIndex].image!!) ImagePipelineMember(image = resultingImage) } } }We can then use this factory function to create an instance of the pipeline:
val pipeline = makePipeline()And then schedule jobs into it:
const val JOB_TAG = "IMAGE_TRANSFORM_JOBS" fun Pipeline<ImagePipelineMember>.createImageJobs(imageUrls: List<String>): List<Job<ImagePipelineMember>> { countedBarriers.forEach { it.setCapacity(imageUrls.size) } return imageUrls.map { url -> push(ImagePipelineMember(url = url), tag = JOB_TAG) } }We can subscribe to these jobs' state changes in our activities/fragments via LiveData:
val jobs = pipeline.createImageJobs(urls) jobs.forEach { job -> job.state.observe(this, Observer { // Update UI in response to state changes }) }See the state machine for the details.
We can also fetch any ongoing jobs from the repository (e.g., if the fragment is re-created):
JOBS_REPO[JOB_TAG].forEach { (job, _, _) -> // Perform actions on the jobs, such as interrupting them, unsubscribing from them, etc. }To lift the internet_barrier automatically when we have internet connectivity, we can use one of the BarrierExtensions:
pipeline.manualBarriers[1].liftWhenHasInternet(App.INSTANCE)And finally, we can start the pipeline when a UI button is clicked:
someButton.setOnClickListener { pipeline.manualBarriers[0].lift() }The progress of a job is encoded via a state machine:
To see a full Android app example using the above pipeline, see the sample app. Or see the javadocs here.

