Я работаю над проектом искровой потоковой передачи в java. Я пытаюсь отправить несколько сообщений из spark в apache kafka, используя API kafka-producer java. Поскольку создание экземпляра KafkaProducer для каждого элемента будет очень дорогим, я пытаюсь использовать пул производителей, используя общую структуру пула apache. Как показано во фрагменте кода ниже, я создаю экземпляр GenericObjectPool и транслирую его, как показано ниже:
GenericObjectPool<KafkaProducer<String, String>> producerPool = new GenericObjectPool<KafkaProducer<String, String>>(
new KafkaProducerFactory(prop));
final Broadcast<GenericObjectPool<KafkaProducer<String, String>>> pool = ssc.sparkContext() .broadcast(producerPool); //**Causing exception**
Код класса KafkaProducerFactory вставлен ниже:-
import java.io.Serializable;
import java.util.Map;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KafkaProducerFactory<K,V> extends BasePooledObjectFactory<KafkaProducer<K, V>>
implements Serializable{
private Map<String,Object> configs;
public KafkaProducerFactory(Map<String, Object> configs) {
this.configs=configs;
}
@Override
public KafkaProducer<K, V> create() {
return new KafkaProducer<K, V>(this.configs);
}
@Override
public PooledObject<KafkaProducer<K,V>> wrap(KafkaProducer<K,V> producer) {
return new DefaultPooledObject<KafkaProducer<K,V>>(producer);
}
@Override
public void destroyObject(PooledObject<KafkaProducer<K,V>>obj){
obj.getObject().close();
}
}
Но строка выше дает мне вставленное ниже исключение: -
com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Полная трассировка стека вставлена ниже:-
Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
referent (java.lang.ref.WeakReference)
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1291)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
at com.veda.txt.spark.Engine.start(Engine.java:63)
at com.veda.txt.spark.Engine.main(Engine.java:126)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1127)
at java.util.Vector$Itr.next(Vector.java:1104)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 39 more
15/08/26 20:38:14 INFO SparkContext: Invoking stop() from shutdown hook
Пожалуйста, предложите мне, что пошло не так.
Такс