diff --git a/src/main/java/com/release11/Main.java b/src/main/java/com/release11/Main.java index 1404495..bff3989 100644 --- a/src/main/java/com/release11/Main.java +++ b/src/main/java/com/release11/Main.java @@ -3,10 +3,7 @@ package com.release11; import com.mysql.cj.jdbc.MysqlDataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; -import org.apache.camel.AggregationStrategy; -import org.apache.camel.CamelContext; -import org.apache.camel.Expression; -import org.apache.camel.ProducerTemplate; +import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; @@ -47,28 +44,27 @@ public class Main { @Override public void configure() throws Exception { - from("direct:start").to("jdbc:source") + from("direct:start") + .setBody(constant("SELECT * FROM material LIMIT 10")) + .to("jdbc:source") .split(body()) - //.bean(myFunctions, "splitMaterial") - .to("activemq:queue:material") - .to("log:?level=INFO&showBody=true"); + .to("activemq:queue:material"); from("activemq:queue:material") .split(body()) //.bean(myFunctions, "getId") - .setHeader("id", constant("${body[id]}")) + .setHeader("id", simple("${body[id]}")) .setBody(constant("SELECT * FROM package WHERE material_id = :?id")) .to("jdbc:source?useHeadersAsParameters=true") - .to("activemq:queue:materialPackage") - .to("log:?level=INFO&showBody=true"); + .to("activemq:queue:materialPackage"); + //.to("log:?level=INFO&showBody=true"); } }); context.start(); ProducerTemplate template = context.createProducerTemplate(); - template.sendBody("direct:start", "SELECT * FROM material"); - + template.sendBody("direct:start", null); Thread.sleep(1000); //template.setDefaultEndpointUri("activemq:queue:material"); diff --git a/src/main/java/com/release11/MyAggregator.java b/src/main/java/com/release11/MyAggregator.java new file mode 100644 index 0000000..f82b885 --- /dev/null +++ b/src/main/java/com/release11/MyAggregator.java @@ -0,0 +1,17 @@ +package com.release11; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; + +public class MyAggregator implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + + String oldEx = oldExchange.getIn().getBody().toString(); + System.out.println(oldEx); + String newEx = newExchange.getIn().getBody().toString(); + System.out.println(newEx); + + return null; + } +} diff --git a/target/classes/com/release11/Main$1.class b/target/classes/com/release11/Main$1.class index 943353e..06d02fc 100644 Binary files a/target/classes/com/release11/Main$1.class and b/target/classes/com/release11/Main$1.class differ diff --git a/target/classes/com/release11/Main.class b/target/classes/com/release11/Main.class index d104523..0e35f97 100644 Binary files a/target/classes/com/release11/Main.class and b/target/classes/com/release11/Main.class differ