some changes

This commit is contained in:
2021-11-22 15:59:00 +01:00
parent 122eb1f0b3
commit fcc526f6e2
30 changed files with 135 additions and 111 deletions

View File

@@ -1,23 +1,17 @@
package com.release11;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.release11.Processors.*;
import generated.ObjectFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.SimpleRegistry;
import org.apache.log4j.BasicConfigurator;
import org.apache.camel.converter.jaxb.JaxbDataFormat;
import javax.xml.bind.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
public class Main {
@@ -47,7 +41,9 @@ public class Main {
JaxbDataFormat xmlDataFormat = new JaxbDataFormat();
xmlDataFormat.setContextPath("generated");
//xmlDataFormat.setSchemaLocation("/home/igor/Documents/Jacek/jacek-and-igor/src/main/resources/material.xsd");
JacksonDataFormat jacksonDataFormat = new JacksonDataFormat(ObjectFactory.class);
jacksonDataFormat.setPrettyPrint(true);
errorHandler(deadLetterChannel("activemq:queue:dead"));
from("direct:start")
.setBody(constant("SELECT * FROM material"))
@@ -60,46 +56,46 @@ public class Main {
.setBody(simple("SELECT * FROM package WHERE material_id = :?material_id"))
.to("jdbc:source?useHeadersAsParameters=true");
from("activemq:queue:RawMaterial")
from("activemq:queue:RawMaterial")//subskruypcja
.enrich("direct:getPackages", new MergeAggregator())
.process(new ValidatorProcess())
.choice()
.when(simple("${exchangeProperty[dimension]}"+" == false"))
.marshal(xmlDataFormat)
.to("activemq:queue:BadEan")
.when(simple("${exchangeProperty[ean]}"+" == false"))
.marshal(xmlDataFormat)
.to("activemq:queue:BadDimension")
.otherwise()
.marshal(xmlDataFormat)
.to("activemq:queue:ValidMaterial")
.when(simple("${exchangeProperty[ean]}"+" == false")).marshal(xmlDataFormat).to("activemq:queue:BadEan")
.when(simple("${exchangeProperty[dimension]}"+" == false")).marshal(xmlDataFormat).to("activemq:queue:BadDimension")
.otherwise().marshal(xmlDataFormat).to("activemq:topic:Material")
.to("log:?level=INFO&showBody=true");
from("activemq:topic:Material?clientId=1&durableSubscriptionName=FilteredType")
.filter().xpath("//material_type='A1' or //material_type='A2' or //material_type='A3'")
.to("activemq:queue:FilteredType");
from("activemq:queue:siema")
.process(new XMLProcess())
from("activemq:queue:FilteredType")
.unmarshal(xmlDataFormat)
.process(new XMLProcess())
.to("activemq:queue:test");
.marshal(jacksonDataFormat)
.to("http:10.101.111.19:1080/api/json/material");
from("activemq:topic:Material?clientId=2&durableSubscriptionName=FilteredIsDeleted&acknowledgementModeName=CLIENT_ACKNOWLEDGE")
.filter().xpath("//is_deleted='false'")
.to("activemq:queue:FilteredIsDeleted");
from("activemq:queue:FilteredIsDeleted")
.unmarshal(xmlDataFormat)
.process(new CSVProcess())
.to("sftp:test@10.101.111.19:2222/upload/test?password=admin");
}
});
context.start();
ProducerTemplate template = context.createProducerTemplate();
Path path = Paths.get("/home/igor/Documents/Jacek/jacek-and-igor/src/main/resources/test.xml");
ArrayList<String> readAllLines = (ArrayList<String>) Files.readAllLines(path);
String allFile = "";
for (int i = 0; i < readAllLines.size(); i++) {
allFile+=readAllLines.get(i);
}
//template.sendBody("direct:start", null);
template.sendBody("activemq:queue:siema", allFile);
template.sendBody("direct:start", null);
context.stop();
}
}

View File

@@ -6,11 +6,8 @@ import generated.MaterialTypeType;
import generated.Package;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class MergeAggregator implements AggregationStrategy {
@Override
@@ -28,10 +25,6 @@ public class MergeAggregator implements AggregationStrategy {
material.setDescription((String) oldEx.get("description"));
material.setIsDeleted((Boolean) oldEx.get("is_deleted"));
oldExchange.getIn().setHeader("dimension", true);
oldExchange.getIn().setHeader("ean", true);
for (int i = 0; i < newEx.size(); i++) {
Package p = new Package();
p.setId((Integer) newEx.get(i).get("id"));

View File

@@ -0,0 +1,34 @@
package com.release11.Processors;
import generated.MaterialType;
import generated.Package;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class CSVProcess implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
MaterialType material = exchange.getIn().getBody(MaterialType.class);
String result = "";
result+=material.getId()+",";
result+=material.getMaterialNumber()+",";
result+=material.getMaterialType()+",";
result+=material.getMaterialName()+",";
result+=material.getDescription()+";";
for (Package p: material.getPackages()) {
String pack = "";
pack+=p.getId()+",";
pack+=p.getPackageNumber()+",";
pack+=p.getMaterialId()+",";
pack+=p.getEan()+",";
pack+=p.getUnitOfMeasure()+",";
pack+=p.getDimension()+",";
pack+=p.getDescription()+";";
result+=pack;
}
exchange.getIn().setBody(result);
}
}

View File

@@ -1,4 +1,4 @@
package com.release11;
package com.release11.Processors;
import generated.MaterialType;
import generated.Package;
@@ -15,10 +15,14 @@ public class ValidatorProcess implements Processor {
Pattern pattern = Pattern.compile("\\d\\dx\\d\\dx\\d\\d");
for (Package p: material.getPackages()) {
Matcher matcher = pattern.matcher(p.getDimension());
if (!matcher.matches())
if (!matcher.matches()) {
exchange.setProperty("dimension", false);
if(!isEanGood(p.getEan()))
break;
}
if (!isEanGood(p.getEan())) {
exchange.setProperty("ean", false);
break;
}
}
}

View File

@@ -0,0 +1,16 @@
package com.release11.Processors;
import generated.MaterialType;
import generated.MaterialTypeType;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class XMLProcess implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
MaterialType material = exchange.getIn().getBody(MaterialType.class);
System.out.println(material.toString());
exchange.getIn().setBody(material);
}
}

View File

@@ -1,14 +0,0 @@
package com.release11;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class XMLProcess implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
String tmp = exchange.getIn().getBody(String.class);
System.out.println(tmp);
exchange.getIn().setBody(tmp);
}
}