Last active
December 10, 2015 17:48
-
-
Save dkincaid/4470361 to your computer and use it in GitHub Desktop.
Example of an issue with trying to use two PailTap's reading from the same Pail in a query.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* If I execute only the clientQuery or only the emailQuery by themselves everything works right. | |
I set breakpoints inside the ExtractClientEdgeFields() and ExtractClientId() functions and they | |
are called with only the Data objects with the correct property types. | |
However, if I execute this query as it is shown here then only one of the two functions is called | |
with all of the Data objects from both taps. */ | |
public static Subquery clientEmail(String pailPath) { | |
PailTap clientEdgeTap = clientEdgeTap(pailPath); | |
PailTap clientTap = petOwnerTap(pailPath); | |
Subquery clientQuery = new Subquery("?sapid", "?clientid") | |
.predicate(clientEdgeTap, "_", "?client-edge-data") | |
.predicate(new ExtractClientEdgeFields(), "?client-edge-data").out("?sapid", "?clientid"); | |
Subquery emailQuery = new Subquery("?clientid", "?email") | |
.predicate(clientTap, "_", "?pet-owner-data") | |
.predicate(new ExtractClientId(), "?pet-owner-data").out("?clientid", "?email"); | |
Subquery fullQuery = new Subquery("?sapid", "?clientid", "?email") | |
.predicate(clientQuery, "?sapid", "?clientid") | |
.predicate(emailQuery, "?clientid", "?email"); | |
return fullQuery; | |
} | |
public static void main(String[] args) throws IOException { | |
Tap stdout = new StdoutTap(); | |
Subquery query = clientEmail(PAIL_PATH); | |
Api.execute(stdout, query); | |
} | |
public static class ExtractClientEdgeFields extends CascalogFunction { | |
@Override | |
public void operate(FlowProcess process, FunctionCall call) { | |
Data data = (Data) call.getArguments().getObject(0); | |
// TODO Why do I need this if statement? We should only be getting ClientEdge Data in here | |
if (data.getDataunit().getSetField() == DataUnit._Fields.CLIENT) { | |
ClientEdge clientEdge = data.getDataunit().getClient(); | |
PracticeId practiceId = clientEdge.getPractice(); | |
ClientId clientId = clientEdge.getClient(); | |
call.getOutputCollector().add(new Tuple(practiceId.getSap_id(), clientId.getPims_client_id().getId())); | |
} | |
} | |
} | |
public static class ExtractClientId extends CascalogFunction { | |
@Override | |
public void operate(FlowProcess flow_process, FunctionCall fn_call) { | |
Data data = (Data) fn_call.getArguments().getObject(0); | |
// TODO Why do I need this? | |
if (data.getDataunit().getSetField() == DataUnit._Fields.CLIENT_PROPERTY) { | |
ClientProperty clientProperty = data.getDataunit().getClient_property(); | |
if (clientProperty.getProperty().getSetField() == ClientPropertyValue._Fields.EMAIL) { | |
fn_call.getOutputCollector().add(new Tuple(clientProperty.getId().getPims_client_id().getId(), clientProperty.getProperty().getEmail())); | |
} | |
} | |
} | |
} | |
public static PailTap attributeTap(String path, final TFieldIdEnum... fields) { | |
List<String>[] attrs = new List[] { | |
new ArrayList<String>() {{ | |
for (TFieldIdEnum field : fields) { | |
add("" + field.getThriftFieldId()); | |
} | |
}} | |
}; | |
return splitDataTap(path, attrs); | |
} | |
public static PailTap splitDataTap(String path, List<String>[] attrs) { | |
PailTap.PailTapOptions pailTapOptions = new PailTap.PailTapOptions(); | |
pailTapOptions.spec = PailTap.makeSpec(null, new SplitDataPailStructure()); | |
pailTapOptions.attrs = attrs; | |
return new PailTap(path, pailTapOptions); | |
} | |
public static PailTap clientTap(String pailPath) { | |
return attributeTap(pailPath, DataUnit._Fields.CLIENT_PROPERTY); | |
} | |
public static PailTap clientEdgeTap(String pailPath) { | |
return attributeTap(pailPath, DataUnit._Fields.CLIENT); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment