Creating data pipelines
In order to create data pipelines, you need to execute 'tasks' and coordinate execution dependencies. Here we show how to do it in bds
Executing a task
The most basic operation is to execute a task, which is done using a task
expression.
bds
takes care of executing a task on different environments (local computer, server, cluster, etc.), so you don't need to focus on mundane details (such as cluster queue monitoring, or querying remote computers for resources).
In this toy example, we schedule 10 tasks for execution. I'm running this on a computer that only has 8 CPUs, so not all tasks can execute in parallel.
File test_04.bds
#!/usr/bin/env bds
for( int i=0 ; i < 10 ; i++ ) {
task echo Hi $i ; sleep 1 ; echo Done $i ; sleep 1
}
Note that bds interpolates variable $i
(string interpolation simply means replacing by the value of a variable within the string)
$ ./test_04.bds
Hi 1
Hi 6
Hi 2
Hi 4
Hi 3
Hi 7
Hi 0
Hi 5
Done 1
Done 6
Done 2
Done 4
Done 3
Done 7
Done 0
Done 5
Hi 9
Hi 8
Done 9
Done 8
In this case, we are running on an 8 core computer, so the first 8 tasks get executed in parallel. The rest is executed when the first tasks finish.
Warning: Task execution order is not guaranteed (e.g. a cluster scheduler can decide to run tasks out of order). We'll see how to coordinate tasks later.
Running on a cluster
Here we show how exactly the same script is run on a cluster, keep in mind that not a single line of code changed. We copy tha same script (used in the previous section) to a cluster. We execute it, but now the tasks are scheduled using MOAB, Torque, PBS or Grid Engine.
$ bds -s cluster test_04.bds
Task dependencies
In a typical pipeline we must be able to control task execution which depends on previously executed tasks. Here we show how.
Waiting for a task to finish 'wait
'
The simplest form of execution control is to wait until one or more tasks finish before executing the next task(s).
This is done using the wait
statement.
In this example, we run a set of tasks (echo Hi $i
) and after all the tasks finished, we run another set of tasks (echo Bye $i
)
File test_05.bds
#!/usr/bin/env bds
for( int i=0 ; i < 10 ; i++ ) {
task echo Hi $i ; sleep 1
}
wait
print("After wait\n")
for( int i=0 ; i < 10 ; i++ ) {
task echo Bye $i ; sleep 1
}
Running the script, we get
$ ./test_05.bds
Hi 7
Hi 0
Hi 6
Hi 5
Hi 1
Hi 3
Hi 4
Hi 2
Hi 8
Hi 9
After wait
Bye 1
Bye 0
Bye 7
Bye 2
Bye 6
Bye 4
Bye 3
Bye 5
Bye 9
Bye 8
- The
wait
statement is like a barrier. Until all tasks are finished, the program does not continue. If any task fails, a checkpoint file is created, where all program data is serialized. We can correct the problem and restart the pipeline where it left. - The
wait
statement can wait for all tasks, for single tasks or for a list of tasks. - We then run the same script on a cluster. Again, this is done simply by using
-s cluster
command line. The video shows how tasks are scheduled and on the cluster, always honoring thewait
statement - The exit code of a
bds
script is 0 if all tasks executed without any problems, and non-zero if any task failed.
Dependency operator '<-
'
The dependency operator provides a simple way to see if a file needs to be updated (i.e. recalculated) with respect to some inputs. For instance, when we already processed some files and have the corresponding results, we may save some work if the inputs have not changed (like "make" command).
We introduce the dependency operator <-
(pronounced 'dep') which is a "make" style operator.
The expression out <- in
is true if 'out' file needs to be updated.
More formally, the expression is true if the file name represented by the variable 'out' does not exist, is empty (zero length) or has a creation date before 'in'.
E.g.:
File test_06.bds
#!/usr/bin/env bds
string inFile = "in.txt"
string outFile = "out.txt"
# Create 'in.txt' if it doesn't exist
if( !inFile.exists() ) {
task echo Creating $inFile; echo Hello > $inFile
}
wait
# Create 'out.txt' only if needs to be updated resepct to 'in.txt'
if( outFile <- inFile ) {
task echo Creating $outFile; cat $inFile > $outFile
}
When executing for the first time, both tasks are executed and both files ('in.txt' and 'out.txt') are created.
$ ./test_06.bds
Creating in.txt
Creating out.txt
If we execute for a second time, since files have not changed, no task
is executed.
$ ./test_06.bds
$
If we now change the contents of 'in.txt', and run the script again, the second task
will be executed (because 'out.txt' needs to be updated with respect to 'in.txt')
# Update 'in.txt'
$ date > in.txt
# Since we updated the input file, the output must be recalculated
$ ./test_06.bds
Creating out.txt
Summary: If the file 'out.txt' is up to date with respect to 'in.txt', the following condition will be false and the task
will not execute
if( outFile <- inFile ) {
task echo Creating $outFile; cat $inFile > $outFile
}
This construction is so common that we allow for some syntactic sugar.
task( outFile <- inFile ) {
sys echo Creating $outFile; cat $inFile > $outFile
}
Task dependency detection
Programming task dependencies can be difficult. BDS can help by automatically inferring task dependencies and executing tasks in the correct order.
In this example, we have two tasks:
- The first task uses an input file 'in.txt', to create an intermediate file 'inter.txt'
- The second task uses the intermediate file 'inter.txt' to create the ouptut file 'out.txt'.
The script below does not have a wait
statement.
Instead bds
automatically infers that the second task depends on the first one, and does not start execution until the first task begins.
Notice that we don't tell bds to wait for the first task to finish (there is no explicit wait
statement).
File test_07.bds
#!/usr/bin/env bds
# We use ':=' for declaration with type inference
inFile := "in.txt"
intermediate := "inter.txt"
outFile := "out.txt"
task( intermediate <- inFile) {
sys echo Creating $intermediate; cat $inFile > $intermediate; sleep 1 ; echo Done $intermediate
}
task( outFile <- intermediate ) {
sys echo Creating $outFile; cat $intermediate > $outFile; echo Done $outFile
}
As a side note: We used the :=
operator to declare variables using type inference.
So we can write inFile := "in.txt"
instead of string inFile = "in.txt"
, which not only is shorter to type, but also makes the code look cleaner.
Now let's run the script
# Delete old file (if anY)
$ rm *.txt
# Create input file
$ date > in.txt
# Run
$ ./test_07.bds
Creating inter.txt
Done inter.txt
Creating out.txt
Done out.txt
Note how the second task is executed only after the first one finished.
Complex dependencies: dep
and goal
The goal
statement helps to program complex task scheduling interdependencies.
In the previous example, we had an input file 'in.txt', an intermediate file 'inter.txt' and an output file 'out.txt'. One problem is that if we delete the intermediate file 'inter.txt' (e.g. because we may want to delete big files with intermediate results), then both tasks will be executed
For convenience, here is the code again. File test_07.bds
#!/usr/bin/env bds
inFile := "in.txt"
intermediate := "inter.txt"
outFile := "out.txt"
task( intermediate <- inFile) {
sys echo Creating $intermediate; cat $inFile > $intermediate; sleep 1 ; echo Done $intermediate
}
task( outFile <- intermediate ) {
sys echo Creating $outFile; cat $intermediate > $outFile; echo Done $outFile
}
# Remove intermediate file
$ rm inter.txt
# Re-execute script
$ ./test_07.bds
Creating inter.txt
Done inter.txt
Creating out.txt
Done out.txt
Why is this happening? The reason is that task
statements are evaluated in order.
So when bds
evaluates the first task
expression, the dependency intermediate <- inFile
is true (because 'inter.txt' doesn't exist, so it must be updated with respect to 'in.txt').
After that, when the second task
expression is evaluated, out <- intermediate
is also true, since 'inter.txt' is newer than 'out.txt'.
As a result, both tasks are re-executed, even though 'out.txt' is up to date with respect to 'in.txt'.
This can be a problem, particularly if each task requires several hours of execution.
There are two ways to solve this, the obvious one is to add a simple 'if' statement surrounding the tasks:
#!/usr/bin/env bds
inFile := "in.txt"
intermediate := "inter.txt"
outFile := "out.txt"
if( outFile <- inFile) {
task( intermediate <- inFile) {
sys echo Creating $intermediate; cat $inFile > $intermediate; sleep 1 ; echo Done $intermediate
}
task( outFile <- intermediate ) {
sys echo Creating $outFile; cat $intermediate > $outFile; echo Done $outFile
}
}
Although it solves the issue, the code is not elegant.
The alternative is to use dep
and goal
* dep
defines a task exactly the same way as task
expression, but it doesn't evaluate if the tasks should be executed or not (it's just declarative).
* goal
executes all dependencies nescesary to create an output
Example:
File test_08.bds
#!/usr/bin/env bds
inFile := "in.txt"
intermediate := "inter.txt"
outFile := "out.txt"
dep( intermediate <- inFile) {
sys echo Creating $intermediate; cat $inFile > $intermediate; sleep 1 ; echo Done $intermediate
}
dep( outFile <- intermediate ) {
sys echo Creating $outFile; cat $intermediate > $outFile; echo Done $outFile
}
goal outFile
If we execute this script
# Delete old files (if any)
$ rm *.txt
# Create input file
$ date > in.txt
# Run script (both tasks should be executed)
$ ./test_08.bds
Creating out.txt
Done out.txt
Creating inter.txt
Done inter.txt
Now we delete 'inter.txt' and re-execute
# Delete intermediate file
$ rm inter.txt
# Run again (out.txt is still up to date with respect to in.txt, so no task should be executed)
$ ./test_08.bds
$
As you can see, no task is executed the second time, since 'out.txt' is up to date, with respect to 'in.txt'. The fact that intermediate file 'inter.txt' was deleted, is ignored, which is what we wanted.