pythonpycharmtradingziplinequantopian

Is there a way to create a pipeline locally using zipline?


I have set up zipline locally on PyCharm. The simulations work, moreover, I have access to premium data from quandl (which automatically updated when I entered my API key). However, now my question is, how do I make a pipeline locally using zipline.


Solution

  • Zipline's documentation is challenging. Zipline.io (as of 2021-0405) is also down. Fortunately, Blueshift has documentation and sample code that shows how to make a pipeline that can be run locally:


    """
        Title: Classic (Pedersen) time-series momentum (equal weights)
        Description: This strategy uses past returns and go long (short)
                    the positive (negative) n-percentile
        Style tags: Momentum
        Asset class: Equities, Futures, ETFs, Currencies
        Dataset: All
    """
    
    """
    Sources:
    Overall Algorithm here:
    https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py
    
    Custom (Ave Vol Filter, Period Returns) Functions Here: 
    https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
    """
    
    import numpy as np
    
    from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
    from zipline.pipeline.data import EquityPricing
    from zipline.api import (
        order_target_percent,
        schedule_function,
        date_rules,
        time_rules,
        attach_pipeline,
        pipeline_output,
    )
    
    
    def average_volume_filter(lookback, amount):
        """
           Returns a custom filter object for volume-based filtering.
    
           Args:
               lookback (int): lookback window size
               amount (int): amount to filter (high-pass)
    
           Returns:
               A custom filter object
    
           Examples::
    
               # from library.pipelines.pipelines import average_volume_filter
    
               pipe = Pipeline()
               volume_filter = average_volume_filter(200, 1000000)
               pipe.set_screen(volume_filter)
        """
    
        class AvgDailyDollarVolumeTraded(CustomFilter):
            inputs = [EquityPricing.close, EquityPricing.volume]
    
            def compute(self, today, assets, out, close_price, volume):
                dollar_volume = np.mean(close_price * volume, axis=0)
                high_volume = dollar_volume > amount
                out[:] = high_volume
    
        return AvgDailyDollarVolumeTraded(window_length=lookback)
    
    
    def period_returns(lookback):
        """
           Returns a custom factor object for computing simple returns over
           period.
    
           Args:
               lookback (int): lookback window size
    
           Returns:
               A custom factor object.
    
           Examples::
    
               # from library.pipelines.pipelines import period_returns
               pipe = Pipeline()
               momentum = period_returns(200)
               pipe.add(momentum,'momentum')
        """
    
        class SignalPeriodReturns(CustomFactor):
            inputs = [EquityPricing.close]
    
            def compute(self, today, assets, out, close_price):
                start_price = close_price[0]
                end_price = close_price[-1]
                returns = end_price / start_price - 1
                out[:] = returns
    
        return SignalPeriodReturns(window_length=lookback)
    
    
    def initialize(context):
        '''
            A function to define things to do at the start of the strategy
        '''
        # The context variables can be accessed by other methods
        context.params = {'lookback': 12,
                          'percentile': 0.1,
                          'min_volume': 1E7
                          }
    
        # Call rebalance function on the first trading day of each month
        schedule_function(strategy, date_rules.month_start(),
                          time_rules.market_close(minutes=1))
    
        # Set up the pipe-lines for strategies
        attach_pipeline(make_strategy_pipeline(context),
                        name='strategy_pipeline')
    
    
    def strategy(context, data):
        generate_signals(context, data)
        rebalance(context, data)
    
    
    def make_strategy_pipeline(context):
        pipe = Pipeline()
    
        # get the strategy parameters
        lookback = context.params['lookback'] * 21
        v = context.params['min_volume']
    
        # Set the volume filter
        volume_filter = average_volume_filter(lookback, v)
    
        # compute past returns
        momentum = period_returns(lookback)
        pipe.add(momentum, 'momentum')
        pipe.set_screen(volume_filter)
    
        return pipe
    
    
    def generate_signals(context, data):
        try:
            pipeline_results = pipeline_output('strategy_pipeline')
        except:
            context.long_securities = []
            context.short_securities = []
            return
    
        p = context.params['percentile']
        momentum = pipeline_results
    
        long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
        short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')
    
        n_long = len(long_candidates)
        n_short = len(short_candidates)
        n = int(min(n_long, n_short) * p)
    
        if n == 0:
            print("{}, no signals".format(data.current_dt))
            context.long_securities = []
            context.short_securities = []
    
        context.long_securities = long_candidates.index[-n:]
        context.short_securities = short_candidates.index[:n]
    
    
    def rebalance(context, data):
        # weighing function
        n = len(context.long_securities)
        if n < 1:
            return
    
        weight = 0.5 / n
    
        # square off old positions if any
        for security in context.portfolio.positions:
            if security not in context.long_securities and \
                    security not in context.short_securities:
                order_target_percent(security, 0)
    
        # Place orders for the new portfolio
        for security in context.long_securities:
            order_target_percent(security, weight)
        for security in context.short_securities:
            order_target_percent(security, -weight)