Узи, дата, время, начало

У меня есть собственный собственный источник, работающий в моем файле flume.config, который отвечает за извлечение данных со страницы Facebook каждый час.

Мне интересно, есть ли способ установить период извлечения со временем запуска моего координатора?

Например, я установил свой координатор на запуск 01.01.2015 в 12:00, тогда мой лоток начинает добычу в то же время.

Это мой пользовательский источник:

public class FacebookPageFansCitySource extends AbstractPollableSource
{
    private String accessToken;
    private String pageId;
    private int refreshInterval;

    private FacebookClient facebookClient;

    private volatile long lastPoll = 0;

    @Override
    protected void doConfigure(Context context) throws FlumeException
    {
        this.accessToken = context.getString(FacebookSourceConstants.ACCESS_TOKEN_KEY);
        this.pageId = context.getString(FacebookSourceConstants.PAGE_ID_KEY);
        this.refreshInterval = context.getInteger(FacebookSourceConstants.REFRESH_INTERVAL_KEY);

        facebookClient = new DefaultFacebookClient(accessToken, Version.VERSION_2_2);
    }

    @Override
    protected void doStart() throws FlumeException
    {
    }

    @Override
    protected void doStop() throws FlumeException
    {
    }

    @Override
    protected Status doProcess() throws EventDeliveryException
    {
        Status status = Status.BACKOFF;

        if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - lastPoll) > refreshInterval)
        {
            lastPoll = System.currentTimeMillis();

            try
            {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");

                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

                Date date = new Date(lastPoll);

                String dateFormatted = simpleDateFormat.format(date);

                final Map<String, String> headers = new HashMap<String, String>();

                headers.put("timestamp", String.valueOf(date.getTime()));

                Insight insight = getInsight(pageId, "page_fans_city");

                if (insight != null)
                {
                    final List<Event> events = new ArrayList<Event>();

                    ChannelProcessor channelProcessor = getChannelProcessor();

                    List<JsonObject> values = insight.getValues();

                    for (JsonObject value : values)
                    {
                        String referenceDate = simpleDateFormat.format(DateUtils.toDateFromLongFormat(value.getString("end_time")));

                        JsonObject jsonObjectValue = value.getJsonObject("value");

                        for (Iterator<?> keys = jsonObjectValue.keys(); keys.hasNext(); )
                        {
                            String key = (String) keys.next();
                            Long count = jsonObjectValue.getLong(key);

                            JsonObject jsonObject = new JsonObject();

                            jsonObject.put("reference_date", referenceDate);
                            jsonObject.put("city", key);
                            jsonObject.put("count", count);
                            jsonObject.put("poll_time", dateFormatted);

                            Event event = EventBuilder.withBody(jsonObject.toString().getBytes(), headers);

                            events.add(event);
                        }
                    }

                    channelProcessor.processEventBatch(events);
                }

                status = Status.READY;
            }
            catch (Exception e)
            {
                Logger.getLogger(FacebookPageFansCitySource.class.getName()).log(Level.SEVERE, null, e);
            }
        }

        return status;
    }

    private Insight getInsight(String objectId, String metric)
    {
        TimeZone timeZone = TimeZone.getTimeZone("UTC");

        Calendar calendar = Calendar.getInstance(timeZone);

        calendar.add(Calendar.DAY_OF_MONTH, -4);

        Parameter parameterSince = Parameter.with("since", calendar.getTime());

        calendar.add(Calendar.DAY_OF_MONTH, 1);

        Parameter parameterUntil = Parameter.with("until", calendar.getTime());

        Connection<Insight> responseListInsight = facebookClient.fetchConnection(objectId + "/insights/" + metric, Insight.class, parameterSince, parameterUntil);

        if (responseListInsight != null && !responseListInsight.getData().isEmpty())
            return responseListInsight.getData().get(0);
        else
            return null;
    }
}

Спасибо за помощь.


person Gabriel Braga    schedule 06.05.2015    source источник


Ответы (1)


Как насчет создания действия Java, и настройте свойство рабочего процесса, которое использует текущее время координатора.

<property>
    <name>myStart</name>
    <value>${coord:current(0)}</value>
</property>

Затем используйте это свойство в своем действии в качестве параметра.

person kecso    schedule 06.05.2015
comment
Ясно, что, сделав это, я перестану использовать Flume для извлечения и сделаю извлечение непосредственно моим координатором. Верно? - person Gabriel Braga; 07.05.2015
comment
Я не знаком с Flume, но Oozie способен выполнять действия Java. Если это решит вашу проблему, я бы сказал да. - person kecso; 07.05.2015
comment
тогда попробую - person Gabriel Braga; 07.05.2015