Skip to content

Commit

Permalink
New PluginExecutor command
Browse files Browse the repository at this point in the history
- A plugin class can be defined outside of the Adam jar, but run through
  the normal AdamMain
- An example plugin, the "Take10Plugin" is included in the test
  directory
- Adds a test suite to the cli module, which can reference the items
  available in the core module
- Adds notion of AccessControl to control the records which can be
  accessed
  • Loading branch information
carlyeks committed Feb 14, 2014
1 parent af08968 commit 5ee2d77
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# ADAM #
* ISSUE [101](https://github.com/bigdatagenomics/adam/issues/101): Add ability to call 'plugins' from the command-line
* ISSUE [99](https://github.com/bigdatagenomics/adam/pull/99): Encoding tag types in the ADAMRecord attributes, adding the 'tags' command
* ISSUE [105](https://github.com/bigdatagenomics/adam/pull/105): Add initial documentation on contributing
* ISSUE [79](https://github.com/bigdatagenomics/adam/pull/79): Adding ability to convert reference FASTA files for nucleotide sequences
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Trunk (not yet released)
* Added ADAMRod model and Reads2Rods transformation; this is a pileup generation function that better takes
advantage of locality for data that is already sorted. This was introduced in PR#36.

* ISSUE 101: Adding ability to call plugins from the command line not defined in the main Adam jar and included
in the classpath.x

OPTIMIZATIONS

* Transformed phred --> double calculation into a LUT, which improves performance. This change was introduced
Expand Down
6 changes: 6 additions & 0 deletions adam-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
<groupId>edu.berkeley.cs.amplab.adam</groupId>
<artifactId>adam-core</artifactId>
</dependency>
<dependency>
<groupId>edu.berkeley.cs.amplab.adam</groupId>
<artifactId>adam-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object AdamMain extends Logging {
Adam2Vcf,
Vcf2Adam,
FindReads,
Fasta2Adam)
Fasta2Adam,
PluginExecutor)

private def printCommands() {
println("\n")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.cli

import org.kohsuke.args4j.{Argument,Option=>Args4jOption}
import org.apache.spark.SparkContext
import org.apache.hadoop.mapreduce.Job
import edu.berkeley.cs.amplab.adam.plugins.{AccessControl, AdamPlugin}
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._
import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import parquet.filter.UnboundRecordFilter
import org.apache.avro.specific.SpecificRecord
import edu.berkeley.cs.amplab.adam.avro.ADAMRecord

object PluginExecutor extends AdamCommandCompanion {
val commandName: String = "plugin_executor"
val commandDescription: String = "Executes an AdamPlugin"

def apply(cmdLine: Array[String]): AdamCommand = {
new ListDict(Args4j[ListDictArgs](cmdLine))
}
}

class PluginExecutorArgs extends Args4jBase with SparkArgs with ParquetArgs {
@Argument(required = true, metaVar = "PLUGIN", usage = "The AdamPlugin to run", index = 0)
var plugin: String = null

@Argument(required = true, metaVar = "INPUT", usage = "The input locations", index = 1)
var input: String = null

@Args4jOption(name = "-access_control", usage = "Class for access control")
var accessControl: String = "edu.berkeley.cs.amplab.adam.plugins.EmptyAccessControl"
}

class PluginExecutor(protected val args: PluginExecutorArgs) extends AdamSparkCommand[PluginExecutorArgs] {
val companion: AdamCommandCompanion = PluginExecutor

def loadPlugin[Input <% SpecificRecord : Manifest, Output](pluginName: String): AdamPlugin[Input, Output] = {
Thread.currentThread()
.getContextClassLoader
.loadClass(pluginName)
.newInstance()
.asInstanceOf[AdamPlugin[Input, Output]]
}

def loadAccessControl[Input <% SpecificRecord : Manifest](accessControl: String): AccessControl[Input] = {
Thread.currentThread()
.getContextClassLoader
.loadClass(accessControl)
.newInstance()
.asInstanceOf[AccessControl[Input]]
}

def load[Input <% SpecificRecord : Manifest](sc: SparkContext, locations: String, projection: Option[Schema]): RDD[Input] = {
sc.adamLoad[Input, UnboundRecordFilter](locations, projection = projection)
}

def output[Output](sc: SparkContext, output: RDD[Output]) {
output.map(_.toString).collect().foreach(println)
}

def run(sc: SparkContext, job: Job): Unit = {
val plugin = loadPlugin[ADAMRecord,Any](args.plugin)
val accessControl = loadAccessControl[ADAMRecord](args.accessControl)

// Create an optional combined filter so that pass-through is not penalized
val filter = accessControl.predicate match {
case None => plugin.predicate match {
case None => None
case Some(predicateFilter) => Some(predicateFilter)
}
case Some(accessControlPredicate) => plugin.predicate match {
case None => Some(accessControlPredicate)
case Some(predicateFilter) => Some((value: ADAMRecord) => accessControlPredicate(value) && predicateFilter(value))
}
}

val firstRdd : RDD[ADAMRecord] = load[ADAMRecord](sc, args.input, plugin.projection)

val input = filter match {
case None => firstRdd
case Some(filterFunc) => firstRdd.filter(filterFunc)
}

println("# Input records: %d".format(input.count()))

val results = plugin.run(sc, input)

println("# Output records: %d".format(results.count()))

output(sc, results)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.cli

import java.io._
import org.scalatest.FunSuite

class PluginExecutorSuite extends FunSuite {

test("take10 works correctly on example SAM") {

val args = new PluginExecutorArgs()
args.plugin = "edu.berkeley.cs.amplab.adam.plugins.Take10Plugin"
val stream = Thread.currentThread().getContextClassLoader.getResourceAsStream("reads12.sam")
val file = File.createTempFile("reads12", ".sam")
val os = new FileOutputStream(file)
val bytes = new Array[Byte](stream.available())
stream.read(bytes)
os.write(bytes)
args.input = file.getAbsolutePath

val pluginExecutor = new PluginExecutor(args)

val pipeIn = new PipedInputStream()
val ps = new PrintStream(new PipedOutputStream(pipeIn))
//scala.Console.withOut(ps)(() => pluginExecutor.run())
pluginExecutor.run()
ps.close()

val reader = new BufferedReader(new InputStreamReader(pipeIn))
val outputString = reader.readLine()
//assert(outputString === "foo bar")

}
}
12 changes: 12 additions & 0 deletions adam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.plugins

trait AccessControl[Input] {
def predicate: Option[Input => Boolean]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.plugins

import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

trait AdamPlugin[Input, Output] {
def projection : Option[Schema]
def predicate : Option[Input => Boolean]

def run(sc: SparkContext, recs: RDD[Input]): RDD[Output]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.plugins

class EmptyAccessControl[T] extends AccessControl[T] with Serializable {
override def predicate: Option[T => Boolean] = None
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.plugins

import edu.berkeley.cs.amplab.adam.avro.ADAMRecord
import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

class Take10Plugin extends AdamPlugin[ADAMRecord, ADAMRecord] with Serializable {
override def projection: Option[Schema] = None
override def predicate: (ADAMRecord) => Boolean = _ => true

override def run(sc: SparkContext, recs: RDD[ADAMRecord]): RDD[ADAMRecord] = {
sc.parallelize(recs.take(10))
}
}

0 comments on commit 5ee2d77

Please sign in to comment.