Java EE in asinhrono izvajanje
V tokratnem zapisu si bomo pogledali, kako izdelati “cron job”, ki bo izbrano množico nizov (to so enote dela) optimalno razpršil med vnaprej določeno število vzporednih in asinhronih izvajanj tj. workerjev.
Naloga workerja bo ta, da bo enostavno izpisal enoto dela oz. podani niz. Ko bodo vsi workerji zasedeni, bo job čakal na prvega prostega in ob končanju izvajanja določenega workerja, avtomatsko zagnal novega. Razprševanje dela, pa se bo izvajalo enkrat na 15 sekund.
@Asynchronous
Kot vemo nam Java EE 6 neposredno ne dovoli uporabljati večnitnosti, saj za nitnost skrbi kontejner EE (in njegov Threading Pool). Ampak session beans (EJB) lahko implementirajo asinhrone metode. Metode postane asinhrone, če jih označimo z anotacijo @Asynchronous.
Velja naslednje, da bean, ki kliče tako metodo, dobi kontrolo nazaj takoj po klicu in predenj se metoda sploh začne izvajati. Dodatno, asinhrona metoda lahko vrne implementacijo java.util.concurrent.Future<V>, preko katere lahko klicajoč bean preverja, kaj se dogaja z asinhronim izvajanjem, npr. ali je izvajanje še aktivno itd.
Glede izvajanja pa bo kontejner EE bo poiskal novo nit v svojem bazenu niti in ji predal izvajanje. Celo več, v skladu s EJBji in propagacijo transakcij, se bo to izvajanje zgodilo znotraj iste transakcije. Če želimo kreirati novo transakcijo, lahko to naredimo s ustrezno anotacijo:
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
Izdelava workerja
Torej potrebujemo session bean (stateless), ki bo implementiral asinhrono metodo. Ta metoda bo vrnila implementacijo java.util.concurrent.Future<String>, zato moramo za naš session bean definirati tudi view, sicer local view. Namreč velja pravilo, da če želimo, da nam asinhrona metoda karkoli vrne, ne smemo uporabiti session bean brez view-a:
public interface AsyncWorker { public Future<String> work(String unit); }
@Stateless @Local(AsyncWorker.class) public class DSLWorker implements AsyncWorker{ @Asynchronous public Future<String> work(String unit) { System.out.println("…obdelujem enoto: " + unit); String status = "END"; return new AsyncResult<String>(status); } }
Zgoraj vidimo, da na koncu metode, vrnemo status po vrednosti “END” in to lahko po potrebi uporabimo v klicajočem beanu.
Izdelava cron joba
Prav tako potrebujemo session bean (stateless), ki pa bo zaradi potrebe po izvajanju vsake 15 sekund uporabil anotacijo @Schedule:
@Stateless public class CronJobFacade { private int numOfExecutions = 3; @EJB private AsyncWorker aworker; @Schedule(second="*/15", minute="*",hour="*", persistent=false) public void dispatch() { //SEZNAM ENOT DELA: List<String> list = new ArrayList<String>(); list.add("A"); list.add("B"); list.add("C"); list.add("D"); //asinhrona izvajanja workerjev List<Future<String>> workers = new ArrayList<Future<String>>(); for (String unitFromList : list) { //IMAMO PREVEČ WORKERJEV, ČAKAM DA SE VSAJ EDEN SPROSTI if(workers.size() > (numOfExecutions-1)) { break_lvl_1: while(true) { for (Future<String> specificWorker : workers) { if(specificWorker.isDone()) { workers.remove(specificWorker); break break_lvl_1; } } } } //DISPATCH Future<String> worker = aworker.work(unitFromList); workers.add(worker); }//for } }
Zgoraj vidimo, da imamo seznam dela, ki ga moram asinhrono opraviti preko asinhronih klicev. Vnaprej določeno število vzporednih izvajanj (workerjev) nam onemogoča, da kreiramo toliko novih workerjev, kot je vseh enot dela. Namreč, ko dosežemo omejitev, začnemo v zanki čakati, da se določeno izvajanje konča. To naredimo s periodičnim preverjanjem oz klicem metode isDone() and implementacijo java.util.concurrent.Future<String>. Ta delo logike, bi lahko izvedli tudi kako drugače, vendar je prikazan lean način.
V resničnem življenju imamo kakšne bolj resne primere, kot je izpis nizov, npr.: worker kliče spletno storitev (JAX-WS) in njen rezultat pošlje na JMS vrsto in na tak način potem dosežemo paralelno klicanje storitev.
Izvorna koda za podan primer je dosegljiva na: