Created
June 3, 2020 16:44
-
-
Save DuncanDoyle/d0f037ff2ee28733aa4be74d2201b3b3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.redhat.piidmigration; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.ObjectInputStream; | |
import java.io.ObjectOutputStream; | |
import java.nio.file.Files; | |
import java.nio.file.Paths; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.Arrays; | |
import java.util.Map; | |
import org.drools.core.common.InternalWorkingMemory; | |
import org.drools.core.impl.InternalKnowledgeBase; | |
import org.drools.core.impl.StatefulKnowledgeSessionImpl; | |
import org.drools.core.marshalling.impl.InputMarshaller; | |
import org.drools.core.marshalling.impl.MarshallerReaderContext; | |
import org.drools.core.marshalling.impl.MarshallerWriteContext; | |
import org.drools.core.marshalling.impl.PersisterHelper; | |
import org.drools.core.marshalling.impl.ProcessMarshallerWriteContext; | |
import org.drools.core.marshalling.impl.ProtobufInputMarshaller; | |
import org.drools.core.marshalling.impl.ProtobufMarshaller; | |
import org.drools.core.marshalling.impl.ProtobufOutputMarshaller; | |
import org.drools.core.process.instance.WorkItem; | |
import org.jbpm.marshalling.impl.JBPMMessages; | |
import org.jbpm.marshalling.impl.ProcessInstanceMarshaller; | |
import org.jbpm.marshalling.impl.ProcessMarshallerRegistry; | |
import org.jbpm.marshalling.impl.ProtobufRuleFlowProcessInstanceMarshaller; | |
import org.jbpm.process.instance.impl.ProcessInstanceImpl; | |
import org.jbpm.ruleflow.instance.RuleFlowProcessInstance; | |
import org.kie.api.KieServices; | |
import org.kie.api.marshalling.ObjectMarshallingStrategyStore; | |
import org.kie.api.runtime.KieContainer; | |
import org.kie.api.runtime.KieSession; | |
import org.kie.api.runtime.process.ProcessInstance; | |
import org.postgresql.largeobject.LargeObject; | |
import org.postgresql.largeobject.LargeObjectManager; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* All the ProtobufMessages that make up the bytearray can be found here: https://github.com/kiegroup/jbpm/blob/master/jbpm-flow/src/main/resources/org/jbpm/marshalling/jbpmmessages.proto | |
* | |
* So far, I've found the following messages that reference process-instance-id: | |
* | |
* ProcessInstance.id | |
* ProcessInstance.parent_process_instance_id | |
* HumanTaskNode.error_handling_process_instance_id | |
* WorkItemNode.error_handling_process_instance_id | |
* SubProcessNode.process_instance_id | |
* WorkItemNode.process_instances_id | |
* | |
* DISCLAIMER: This operation is extremely dangerous! And only tested on very simple use-cases. Currently it is only capable of | |
* | |
*/ | |
public class PiidMigrationManager { | |
private static final Logger LOGGER = LoggerFactory.getLogger(PiidMigrationManager.class); | |
public static void main(final String[] args) throws Exception { | |
final String url = "jdbc:postgresql://localhost:5432/bpmsuite-64"; | |
final String user = "postgres"; | |
final String password = "postgres"; | |
final long processInstanceId = 30070L; | |
KieServices ks = KieServices.Factory.get(); | |
KieContainer kContainer = ks.getKieClasspathContainer(); | |
KieSession kieSession = kContainer.newKieSession(); | |
LOGGER.debug("Connection to DB!"); | |
final Connection conn = connect(url, user, password); | |
//Need to disable auto-commit in order to work with LargeObjects | |
conn.setAutoCommit(false); | |
processProcessInstances(processInstanceId, conn, kieSession); | |
processWorkItems(processInstanceId, conn, kieSession); | |
conn.close(); | |
} | |
private static void processProcessInstances(long processInstanceId, Connection conn, KieSession kieSession) throws Exception { | |
final PreparedStatement ps = conn | |
.prepareStatement("SELECT processinstancebytearray FROM processinstanceinfo p WHERE p.instanceid = ?"); | |
ps.setLong(1, processInstanceId); | |
LOGGER.debug("Retrieving ProcessInstance bytearray"); | |
final ResultSet resultSet = ps.executeQuery(); | |
if (resultSet.next()) { | |
//final byte[] processInstanceByteArray = resultSet.getBytes(1); | |
int columnIndex = 1; | |
LargeObjectManager lobj = ((org.postgresql.PGConnection) conn).getLargeObjectAPI(); | |
long oid = resultSet.getLong(columnIndex); | |
if (oid < 1) { | |
throw new RuntimeException("Invalid bytearray object id!"); | |
} | |
LargeObject obj = lobj.open(oid, LargeObjectManager.READ); | |
//in = obj.getInputStream(); | |
byte[] processInstanceByteArray = new byte[obj.size()]; | |
obj.read(processInstanceByteArray, 0, obj.size()); | |
//LOGGER.debug("Found ProcessInstance ByteArray with value: " + processInstanceByteArray.toString()); | |
ProcessInstance processInstance = unmarshalProcessInstance(processInstanceByteArray, kieSession); | |
LOGGER.debug("Unmarshalled ProcessInstance with instance-id: " + processInstance.getId()); | |
//Change the processInstance Id | |
((RuleFlowProcessInstance) processInstance).setId(processInstanceId); | |
byte[] marshalledProcessInstanceByteArray = marshalProcessInstance(processInstance, kieSession); | |
boolean baEqual = Arrays.equals(processInstanceByteArray, marshalledProcessInstanceByteArray); | |
if (baEqual) { | |
LOGGER.debug("Byte arrays are equal!"); | |
} else { | |
LOGGER.debug("Byte arrays are not equal!"); | |
} | |
LargeObject objWrite = lobj.open(oid, LargeObjectManager.WRITE); | |
objWrite.write(marshalledProcessInstanceByteArray); | |
conn.commit(); | |
} | |
} | |
private static void processWorkItems(long processInstanceId, Connection conn, KieSession kieSession) throws Exception { | |
final PreparedStatement psWorkItem = conn | |
.prepareStatement("SELECT workitembytearray FROM workiteminfo w WHERE w.processinstanceid = ?"); | |
psWorkItem.setLong(1, processInstanceId); | |
LOGGER.debug("Retrieving WorrkItem bytearray"); | |
final ResultSet resultSetWorkItem = psWorkItem.executeQuery(); | |
while (resultSetWorkItem.next()) { | |
int columnIndex = 1; | |
LargeObjectManager lobj = ((org.postgresql.PGConnection) conn).getLargeObjectAPI(); | |
long oid = resultSetWorkItem.getLong(columnIndex); | |
if (oid < 1) { | |
throw new RuntimeException("Invalid bytearray object id!"); | |
} | |
LargeObject obj = lobj.open(oid, LargeObjectManager.READ); | |
//in = obj.getInputStream(); | |
byte[] workItemByteArray = new byte[obj.size()]; | |
obj.read(workItemByteArray, 0, obj.size()); | |
//LOGGER.debug("Found ProcessInstance ByteArray with value: " + processInstanceByteArray.toString()); | |
WorkItem workItem = unmarshalWorkItem(workItemByteArray, kieSession); | |
LOGGER.debug("Unmarshalled WorkItem with instance-id: " + workItem.getId()); | |
//Change ProcessInstanceId. | |
workItem.setProcessInstanceId(processInstanceId); | |
byte[] marshalledWorkItemByteArray = marshalWorkItem(workItem, kieSession); | |
boolean baEqual = Arrays.equals(workItemByteArray, marshalledWorkItemByteArray); | |
if (baEqual) { | |
LOGGER.debug("Byte arrays are equal!"); | |
} else { | |
LOGGER.debug("Byte arrays are not equal!"); | |
} | |
WorkItem newWorkItem = unmarshalWorkItem(marshalledWorkItemByteArray, kieSession); | |
LOGGER.debug("Original: " + workItem.toString()); | |
LOGGER.debug("New: " + newWorkItem.toString()); | |
LOGGER.debug("DeploymentId: " + workItem.getDeploymentId()); | |
LOGGER.debug("DeploymentId: " + newWorkItem.getDeploymentId()); | |
LOGGER.debug("NodeId: " + workItem.getNodeId()); | |
LOGGER.debug("NodeId: " + newWorkItem.getNodeId()); | |
LOGGER.debug("NodeInstanceId: " + workItem.getNodeInstanceId()); | |
LOGGER.debug("NodeInstanceId: " + newWorkItem.getNodeInstanceId()); | |
LargeObject objWrite = lobj.open(oid, LargeObjectManager.WRITE); | |
objWrite.write(marshalledWorkItemByteArray); | |
conn.commit(); | |
} | |
} | |
public static Connection connect(final String url, final String user, final String password) { | |
Connection conn = null; | |
try { | |
conn = DriverManager.getConnection(url, user, password); | |
System.out.println("Connected to the PostgreSQL server successfully."); | |
} catch (final SQLException e) { | |
System.out.println(e.getMessage()); | |
} | |
return conn; | |
} | |
public static ProcessInstance unmarshalProcessInstance(byte[] processInstanceByteArray, KieSession kieSession) throws Exception { | |
///byte[] data = readBinaryData("processByteArray.txt"); | |
byte[] data = processInstanceByteArray; | |
ProcessInstance processInstance = unmarshallProcessInstances(data, kieSession); | |
System.out.println("processInstance = " + processInstance); | |
/* | |
Collection<NodeInstance> nodeInstances = ((WorkflowProcessInstanceImpl)processInstance).getNodeInstances(); | |
for (NodeInstance nodeInstance : nodeInstances) { | |
System.out.println("nodeInstance = " + nodeInstance); | |
if (nodeInstance instanceof HumanTaskNodeInstance) { | |
HumanTaskNodeInstance humanTaskNodeInstance = (HumanTaskNodeInstance)nodeInstance; | |
System.out.println(" getWorkItemId() = " + humanTaskNodeInstance.getWorkItemId()); | |
System.out.println(" getNodeId() = " + humanTaskNodeInstance.getNodeId()); | |
// System.out.println(" getNodeName() = " + humanTaskNodeInstance.getNodeName()); | |
} | |
} | |
*/ | |
return processInstance; | |
} | |
public static byte[] marshalProcessInstance(ProcessInstance processInstance, KieSession kieSession) throws Exception { | |
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
String processInstanceType = "RuleFlow"; | |
try { | |
ProcessMarshallerWriteContext context = new ProcessMarshallerWriteContext(baos, null, null, null, null, kieSession.getEnvironment()); | |
context.setProcessInstanceId(processInstance.getId()); | |
context.setState(processInstance.getState() == ProcessInstance.STATE_ACTIVE ? | |
ProcessMarshallerWriteContext.STATE_ACTIVE:ProcessMarshallerWriteContext.STATE_COMPLETED); | |
String processType = ((ProcessInstanceImpl) processInstance).getProcess().getType(); | |
saveProcessInstanceType( context, | |
processInstance, | |
processType ); | |
ProcessInstanceMarshaller marshaller = ProcessMarshallerRegistry.INSTANCE.getMarshaller( processType ); | |
Object result = marshaller.writeProcessInstance( context, | |
processInstance); | |
if( marshaller instanceof ProtobufRuleFlowProcessInstanceMarshaller && result != null ) { | |
JBPMMessages.ProcessInstance _instance = (JBPMMessages.ProcessInstance)result; | |
PersisterHelper.writeToStreamWithHeader( context, | |
_instance ); | |
} | |
context.close(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
return baos.toByteArray(); | |
} | |
private static void saveProcessInstanceType(MarshallerWriteContext context, | |
ProcessInstance processInstance, | |
String processInstanceType) throws IOException { | |
ObjectOutputStream stream = context.stream; | |
// saves the processInstance type first | |
stream.writeUTF( processInstanceType ); | |
} | |
private static ProcessInstance unmarshallProcessInstances(byte[] marshalledSessionByteArray, KieSession kieSession) | |
throws Exception { | |
ByteArrayInputStream bais = new ByteArrayInputStream(marshalledSessionByteArray); | |
MarshallerReaderContext context = new MarshallerReaderContext(bais, (InternalKnowledgeBase) kieSession.getKieBase(), null, null, | |
ProtobufMarshaller.TIMER_READERS, kieSession.getEnvironment()); | |
context.wm = ((StatefulKnowledgeSessionImpl) kieSession).getInternalWorkingMemory(); | |
// Unmarshall | |
ObjectInputStream stream = context.stream; | |
String processInstanceType = stream.readUTF(); | |
LOGGER.debug("ProcessInstanceType: " + processInstanceType); | |
ProtobufRuleFlowProcessInstanceMarshaller processMarshaller = (ProtobufRuleFlowProcessInstanceMarshaller) ProcessMarshallerRegistry.INSTANCE | |
.getMarshaller(processInstanceType); | |
ProcessInstance processInstance = null; | |
try { | |
processInstance = processMarshaller.readProcessInstance(context); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw e; | |
} | |
context.close(); | |
return processInstance; | |
} | |
public static byte[] marshalWorkItem(WorkItem workItem, KieSession kieSession) { | |
long state = (long) workItem.getState(); | |
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
try { | |
MarshallerWriteContext context = new MarshallerWriteContext(baos, (InternalKnowledgeBase)null, (InternalWorkingMemory)null, (Map)null, (ObjectMarshallingStrategyStore)null, kieSession.getEnvironment()); | |
ProtobufOutputMarshaller.writeWorkItem(context, workItem); | |
context.close(); | |
byte[] workItemByteArray = baos.toByteArray(); | |
return workItemByteArray; | |
} catch (IOException var3) { | |
throw new IllegalArgumentException("IOException while storing workItem " + workItem.getId() + ": " + var3.getMessage()); | |
} | |
} | |
public static WorkItem unmarshalWorkItem(byte[] workItemByteArray, KieSession kieSession) { | |
//this.env = env; | |
WorkItem workItem = null; | |
try { | |
ByteArrayInputStream bais = new ByteArrayInputStream(workItemByteArray); | |
MarshallerReaderContext context = new MarshallerReaderContext(bais, (InternalKnowledgeBase) kieSession.getKieBase(), (Map)null, (ObjectMarshallingStrategyStore)null, (Map)null, kieSession.getEnvironment()); | |
try { | |
workItem = ProtobufInputMarshaller.readWorkItem(context); | |
} catch (Exception var8) { | |
try { | |
context.close(); | |
bais = new ByteArrayInputStream(workItemByteArray); | |
context = new MarshallerReaderContext(bais, (InternalKnowledgeBase) kieSession.getKieBase(), (Map)null, (ObjectMarshallingStrategyStore)null, (Map)null, kieSession.getEnvironment()); | |
workItem = InputMarshaller.readWorkItem(context); | |
} catch (IOException var7) { | |
LOGGER.error("Unable to read work item with InputMarshaller", var7); | |
throw new RuntimeException("Unable to read work item ", var8); | |
} | |
} | |
context.close(); | |
} catch (IOException var9) { | |
var9.printStackTrace(); | |
throw new IllegalArgumentException("IOException while loading work item: " + var9.getMessage()); | |
} | |
return workItem; | |
} | |
private static byte[] readBinaryData(String fileName) throws IOException { | |
byte[] bytes = Files.readAllBytes(Paths.get(fileName)); | |
return bytes; | |
} | |
private static void writeBinaryData(byte[] data, String fileName) throws IOException { | |
Files.write(Paths.get(fileName), data); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment