Java EE in asinhrono izvajanje

Posted on Updated on

V tokratnem zapisu si bomo pogledali, kako izdelati “cron job”, ki bo izbrano množico nizov (to so enote dela) optimalno razpršil med vnaprej določeno število vzporednih in asinhronih izvajanj tj. workerjev.

Naloga workerja bo ta, da bo enostavno izpisal enoto dela oz. podani niz. Ko bodo vsi workerji zasedeni, bo job čakal na prvega prostega in ob končanju izvajanja določenega workerja, avtomatsko zagnal novega.  Razprševanje dela, pa se bo izvajalo enkrat na 15 sekund.

@Asynchronous

Kot vemo nam Java EE 6 neposredno ne dovoli uporabljati večnitnosti, saj za nitnost skrbi kontejner EE (in njegov Threading Pool). Ampak session beans (EJB) lahko implementirajo asinhrone metode. Metode postane asinhrone, če jih označimo z anotacijo @Asynchronous.

Velja naslednje, da bean, ki kliče tako metodo, dobi kontrolo nazaj takoj po klicu in predenj se metoda sploh začne izvajati. Dodatno, asinhrona metoda lahko vrne implementacijo java.util.concurrent.Future<V>, preko katere lahko klicajoč bean preverja, kaj se dogaja z asinhronim izvajanjem, npr. ali je izvajanje še aktivno itd.

Glede izvajanja pa bo kontejner EE bo poiskal novo nit v svojem bazenu niti in ji predal izvajanje. Celo več, v skladu s EJBji in propagacijo transakcij, se bo to izvajanje zgodilo znotraj iste transakcije. Če želimo kreirati novo transakcijo, lahko to naredimo s ustrezno anotacijo:

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)

Izdelava workerja

Torej potrebujemo session bean (stateless), ki bo implementiral asinhrono metodo. Ta metoda bo vrnila implementacijo java.util.concurrent.Future<String>, zato moramo za naš session bean definirati tudi view, sicer local view. Namreč velja pravilo, da če želimo, da nam asinhrona metoda karkoli vrne, ne smemo uporabiti session bean brez view-a:

public interface AsyncWorker {
    public Future<String> work(String unit);
}
@Stateless
@Local(AsyncWorker.class)
public class DSLWorker implements AsyncWorker{

    @Asynchronous
    public Future<String> work(String unit) {
        System.out.println("…obdelujem enoto: " + unit);

        String status = "END";
        return new AsyncResult<String>(status);
    }
}

Zgoraj vidimo, da na koncu metode, vrnemo status po vrednosti “END” in to lahko po potrebi uporabimo v klicajočem beanu.

Izdelava cron joba

Prav tako potrebujemo session bean (stateless), ki pa bo zaradi potrebe po izvajanju vsake 15 sekund uporabil anotacijo @Schedule:

@Stateless
public class CronJobFacade {
     private int numOfExecutions = 3;

     @EJB 
     private AsyncWorker aworker;

     @Schedule(second="*/15", minute="*",hour="*", persistent=false)
     public void dispatch() {
         //SEZNAM ENOT DELA:
         List<String> list = new ArrayList<String>();
         list.add("A");
         list.add("B");
         list.add("C");
         list.add("D");

         //asinhrona izvajanja workerjev
         List<Future<String>> workers = new ArrayList<Future<String>>();

         for (String unitFromList : list) {
             //IMAMO PREVEČ WORKERJEV, ČAKAM DA SE VSAJ EDEN SPROSTI
             if(workers.size() > (numOfExecutions-1)) {

                 break_lvl_1:
                 while(true) {
                     for (Future<String> specificWorker : workers) {
                         if(specificWorker.isDone()) {
                             workers.remove(specificWorker);
                             break break_lvl_1;
                         }
                      } 
                 }
             }
             //DISPATCH
             Future<String> worker = aworker.work(unitFromList);
             workers.add(worker);

         }//for
     }
}

Zgoraj vidimo, da imamo seznam dela, ki ga moram asinhrono opraviti preko asinhronih klicev. Vnaprej določeno število vzporednih izvajanj (workerjev) nam onemogoča, da kreiramo toliko novih workerjev, kot je vseh enot dela. Namreč, ko dosežemo omejitev, začnemo v zanki čakati, da se določeno izvajanje konča. To naredimo s periodičnim preverjanjem oz klicem metode isDone() and implementacijo java.util.concurrent.Future<String>. Ta delo logike, bi lahko izvedli tudi kako drugače, vendar je prikazan lean način.

V resničnem življenju imamo kakšne bolj resne primere, kot je izpis nizov, npr.: worker kliče spletno storitev (JAX-WS) in njen rezultat pošlje na JMS vrsto in na tak način potem dosežemo paralelno klicanje storitev.

Izvorna koda za podan primer je dosegljiva na:

https://github.com/mpevec/blog/tree/master/ee-cron-async

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s