首页 > 其他 > 详细

airflow-kafka-use-demo

时间:2019-10-23 18:47:52      阅读:395      评论:0      收藏:0      [点我收藏+]

场景:

使用airflow: 处理kafkfa  consumer 的数据:

 

myconsume.pyr:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import json
import logging 
import os

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import airflow.utils
from kafka import KafkaConsumer

working_file_suffix=".json"
working_kafka=192.168.18.129:9092
working_topic="airflow.topic"
woking_consumer_group="haha"
working_dag="airflow trigger_dag "


# 定义默认参数
default_args = {
    owner: airflow,  # 拥有者名称
        start_date: airflow.utils.dates.days_ago(1),#  第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
    email: [lshan523@163.com],  # 接收通知的email列表
    email_on_failure: True,  # 是否在任务执行失败时接收邮件
    email_on_retry: True,  # 是否在任务重试时接收邮件
    retries: 3,  # 失败重试次数
    retry_delay: timedelta(seconds=5)  # 失败重试间隔
}

# 定义DAG
dag = DAG(
    dag_id=kafkaconsumer,  # dag_id
    default_args=default_args,  # 指定默认参数
    # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
#     schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
)  



def store(working_directory,data):
    dt_ms = datetime.datetime.now().strftime(%Y-%m-%d_%H_%M_%S_%f)
    task_file=working_directory+dt_ms+_data+working_file_suffix
    with open(task_file, w) as fw:
        json.dump(data,fw)  # Store the json data to the file      
    return task_file  
  
‘‘‘
cankao :https://blog.csdn.net/yf_li123/article/details/84075588
os.system( “airflow trigger_dag <dag_id> -c <json格式的字符串参数>”)
key  :要触发的dag_id
value: json格式的字符串参数>
‘‘‘          
def start_dags(key,value):
        logging.info("Start triggering DAG to handle it  ") 
        #To avoid error cases , it‘s better pass the task name too 
        ###airflow trigger_dag Update_Shopify_OrderStatus_Handle -c ‘{"working_time":"zzzzz"}‘  
        logging.info("key is "+str(key))    
        logging.info("value is "+str(value))    
        json_data={}
        json_data["task"]=value
#         airflow trigger_dag <dag_id> -c <json格式的字符串参数>
        job_command=working_dag+str(key)+" -c ‘"+json.dumps(json_data)+""
        os.system(job_command)
 
def prepare_job_data(request_data):
    working_directory="/tmp/mydata/"
    #要触发的Dags 的id
    working_handle="MYtask"
    job_data={}
    task_file=store(working_directory,request_data)             
    job_data[working_handle]=task_file
    return job_data
   
          
def process_job(request_data):   
    if request_data:
        job_data=prepare_job_data(request_data)
        logging.info("####JOB DATA #########")
        if job_data:
            for key,value in job_data.items():
                start_dags(key,value) 


  
def main():
        consumer = KafkaConsumer(working_topic,group_id=woking_consumer_group,
                             bootstrap_servers=[working_kafka],value_deserializer=json.loads)    
        for message in consumer:
            if message:
                print(message.value)
                process_job(message.value)
                


# 定义要执行的task 1
consumer = PythonOperator(
    task_id=main,  # task_id
    python_callable=main,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
    retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
    provide_context=True,
)

    

# consumer
    
if __name__ == __main__:
    main()

 

 

consumer 触发的task:

MYtask.py

#coding=utf-8
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator

from datetime import timedelta,datetime as datetime1
import datetime as datetime2
import json
dt = datetime1.utcnow()-datetime2.timedelta(minutes=10)
# 定义默认参数
default_args = {
    owner: airflow,
    depends_on_past: False,
    start_date: datetime1(dt.year,dt.month,dt.day,dt.hour),
     #‘start_date‘: airflow.utils.dates.days_ago(2), 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
    email: [sealiu@icil.net],
    email_on_failure: False,
    email_on_retry: False,
    retries: 0,
    retry_delay: timedelta(minutes=1),
}
dag = DAG(
    dag_id=MYtask,  # dag_id
    default_args=default_args,  # 指定默认参数
    # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
    #schedule_interval=timedelta(minutes=60)  # 执行周期,表示每分钟执行一次
)

#This method is to load one message from one file  
def load_message(file_path):
    with open(file_path) as f:
        message = json.load(f)
        return message 


# 定义要执行的Python函数1
def hello_world_args_1(**context):
    #获取刚刚放入的参数
    task=context.get(dag_run).conf.get(task)
    print("############ task -->  "+str(task))
#     order_files=[]
    if task: 
#         order_files.append(task)
        order_file=task 
        order_data=load_message(order_file)
        current_time = str(datetime.today())
        with open(/tmp/kafka-test.txt, a) as f:
            f.write(%s\n % current_time)
            f.write(%s\n % order_data)
        assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
    #     context[‘task_instance‘].xcom_push(key=‘sea1‘, value="seaseseseaaa")


# 定义要执行的task 1
task1= BranchPythonOperator(
    task_id=hello_world_args_1,  # task_id
    provide_context=True,
    retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
    python_callable=hello_world_args_1,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
)




task1

    

 

airflow-kafka-use-demo

原文:https://www.cnblogs.com/lshan/p/11727882.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!