Skip to content

Hari spark manager

session.hari_spark_session_manager

HariSparkSessionManager

Source code in hari_data/session/hari_spark_session_manager.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class HariSparkSessionManager:
    _instance: Optional[object] = None
    _spark_session: Optional[SparkSession] = None
    _logger = None
    _lock: Lock = Lock()

    __factories: Dict[str, BaseHariSparkSession] = {
        'local': HariSparkSessionGeneric,
    }

    def __new__(cls):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super(
                        HariSparkSessionManager, cls
                    ).__new__(cls)
        return cls._instance

    @property
    def is_configured(self) -> bool:
        """
        Check if the Spark session is configured.

        Returns:
            bool: True if the Spark session is configured, False otherwise.

        Examples:
            >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
            >>> HariSparkSessionManager().is_configured
            False

            >>> HariSparkSessionManager._spark_session = 'spark_session_mock'
            >>> HariSparkSessionManager().is_configured
            True
        """
        if self._spark_session is None:
            return False
        return True

    def configure(
        self,
        env: str,
        configs_path: str = './configs/configs.yaml',
        app_name: Optional[str] = 'HARI_JOB_DEFAULT',
        log_level: Optional[str] = 'INFO',
        master_url: Optional[str] = 'local[*]',
        jars_path: Optional[str] = None,
        spark_extras: Optional[Dict[str, str]] = None,
    ) -> None:
        """
        Configure the Spark session based on the specified environment.

        Parameters:
            env (str): The environment to configure the Spark session for (e.g., 'local').
            configs_path (str): Path to a YAML configuration file that may contain
                                'app_name', 'log_level', 'master_url', and 'jars_path' settings.
                                Default is './configs/configs.yaml'.
            app_name (Optional[str]): The name of the Spark application. Default is 'HARI_JOB_DEFAULT'.
            log_level (Optional[str]): The logging level for Spark (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR',
                                        'CRITICAL'). Default is 'INFO'.
            master_url (Optional[str]): The master URL for the Spark session (e.g., 'local[*]'). Default is None.
            jars_path (Optional[str]): Path to the directory containing JAR files to include in the Spark session.
                                    Default is None.
            spark_extras: Additional Spark configuration options as keyword arguments.

        Raises:
            ValueError: If the specified environment is not supported or if configuration parameters are invalid.

        Examples:
            >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
            >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
            Spark session configured for environment: local

            >>> HariSparkSessionManager().configure(env='local', configs_path=None)
            Spark session is already configured. Skipping reconfiguration.

            >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
            >>> HariSparkSessionManager().configure(env='123')
            Traceback (most recent call last):
                ...
            ValueError: Unsupported environment '123'. Supported environments: ['local'].

        """

        if self.is_configured:
            print(
                'Spark session is already configured. Skipping reconfiguration.'
            )
            return

        configs = {}

        if configs_path:
            try:
                configs_path = validate_path_exists(
                    configs_path, 'configs_path'
                )['value']
                configs = read_yaml_to_dict(configs_path)

            except Exception as e:
                print(
                    f'Error reading configs from {configs_path}: {e}. Using Provided Parameters.'
                )

        app_name = configs.get('app_name', app_name)
        log_level = configs.get('log_level', log_level)
        master_url = configs.get('master_url', master_url)
        jars_path = configs.get('jars_path', jars_path)

        if not self._logger:
            logger_manager.configure(
                app_name=app_name,
                log_level=log_level,
            )

        self._logger = logger_manager.get_logger()['logger']
        env: Dict[str, str] = validate_non_empty_string(env, 'env')[
            'value'
        ].lower()

        factory_class = self.__factories.get(
            validate_non_empty_string(env, 'env')['value'].lower()
        )

        if not factory_class:
            raise ValueError(
                f"Unsupported environment '{env}'. Supported environments: {list(self.__factories.keys())}."
            )

        factory: BaseHariSparkSession = factory_class(
            app_name=app_name,
            master_url=master_url,
            spark_log_level=log_level,
            jars_path=jars_path,
        )

        spark: Dict[str, SparkSession] = factory.create_spark_session(
            spark_extras=spark_extras
        )

        self._spark_session = spark.get('spark_session')
        print(f'Spark session configured for environment: {env}')

    def get_spark_session(self) -> Dict[str, SparkSession]:
        """
        Get the configured Spark session.

        Returns:
            Dict[str, SparkSession]: A dictionary containing the SparkSession instance.

        Raises:
            ValueError: If the Spark session has not been configured.

        Examples:
            >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
            Spark session configured for environment: local
            >>> HariSparkSessionManager().get_spark_session()
            {'spark_session': <pyspark.sql.session.SparkSession object at 0x...>}

            >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
            >>> HariSparkSessionManager().get_spark_session()
            Traceback (most recent call last):
                ...
            ValueError: Spark session is not configured. Please call 'configure' first.
        """
        if not self.is_configured:
            raise ValueError(
                "Spark session is not configured. Please call 'configure' first."
            )
        return {'spark_session': self._spark_session}

    def stop_spark_session(self) -> None:
        """
        Stop the Spark session and reset the manager.

        Examples:
            >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
            >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
            Spark session configured for environment: local
            >>> HariSparkSessionManager().stop_spark_session()
            Spark session stopped and manager reset.
            >>> HariSparkSessionManager().stop_spark_session()
        """
        if not self.is_configured:
            return
        self._spark_session.stop()
        self._spark_session = None
        self._instance = None
        print('Spark session stopped and manager reset.')

is_configured property

Check if the Spark session is configured.

Returns:

Name Type Description
bool bool

True if the Spark session is configured, False otherwise.

Examples:

>>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
>>> HariSparkSessionManager().is_configured
False
>>> HariSparkSessionManager._spark_session = 'spark_session_mock'
>>> HariSparkSessionManager().is_configured
True

configure(env, configs_path='./configs/configs.yaml', app_name='HARI_JOB_DEFAULT', log_level='INFO', master_url='local[*]', jars_path=None, spark_extras=None)

Configure the Spark session based on the specified environment.

Parameters:

Name Type Description Default
env str

The environment to configure the Spark session for (e.g., 'local').

required
configs_path str

Path to a YAML configuration file that may contain 'app_name', 'log_level', 'master_url', and 'jars_path' settings. Default is './configs/configs.yaml'.

'./configs/configs.yaml'
app_name Optional[str]

The name of the Spark application. Default is 'HARI_JOB_DEFAULT'.

'HARI_JOB_DEFAULT'
log_level Optional[str]

The logging level for Spark (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'). Default is 'INFO'.

'INFO'
master_url Optional[str]

The master URL for the Spark session (e.g., 'local[*]'). Default is None.

'local[*]'
jars_path Optional[str]

Path to the directory containing JAR files to include in the Spark session. Default is None.

None
spark_extras Optional[Dict[str, str]]

Additional Spark configuration options as keyword arguments.

None

Raises:

Type Description
ValueError

If the specified environment is not supported or if configuration parameters are invalid.

Examples:

>>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
>>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
Spark session configured for environment: local
>>> HariSparkSessionManager().configure(env='local', configs_path=None)
Spark session is already configured. Skipping reconfiguration.
>>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
>>> HariSparkSessionManager().configure(env='123')
Traceback (most recent call last):
    ...
ValueError: Unsupported environment '123'. Supported environments: ['local'].
Source code in hari_data/session/hari_spark_session_manager.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def configure(
    self,
    env: str,
    configs_path: str = './configs/configs.yaml',
    app_name: Optional[str] = 'HARI_JOB_DEFAULT',
    log_level: Optional[str] = 'INFO',
    master_url: Optional[str] = 'local[*]',
    jars_path: Optional[str] = None,
    spark_extras: Optional[Dict[str, str]] = None,
) -> None:
    """
    Configure the Spark session based on the specified environment.

    Parameters:
        env (str): The environment to configure the Spark session for (e.g., 'local').
        configs_path (str): Path to a YAML configuration file that may contain
                            'app_name', 'log_level', 'master_url', and 'jars_path' settings.
                            Default is './configs/configs.yaml'.
        app_name (Optional[str]): The name of the Spark application. Default is 'HARI_JOB_DEFAULT'.
        log_level (Optional[str]): The logging level for Spark (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR',
                                    'CRITICAL'). Default is 'INFO'.
        master_url (Optional[str]): The master URL for the Spark session (e.g., 'local[*]'). Default is None.
        jars_path (Optional[str]): Path to the directory containing JAR files to include in the Spark session.
                                Default is None.
        spark_extras: Additional Spark configuration options as keyword arguments.

    Raises:
        ValueError: If the specified environment is not supported or if configuration parameters are invalid.

    Examples:
        >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
        >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
        Spark session configured for environment: local

        >>> HariSparkSessionManager().configure(env='local', configs_path=None)
        Spark session is already configured. Skipping reconfiguration.

        >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
        >>> HariSparkSessionManager().configure(env='123')
        Traceback (most recent call last):
            ...
        ValueError: Unsupported environment '123'. Supported environments: ['local'].

    """

    if self.is_configured:
        print(
            'Spark session is already configured. Skipping reconfiguration.'
        )
        return

    configs = {}

    if configs_path:
        try:
            configs_path = validate_path_exists(
                configs_path, 'configs_path'
            )['value']
            configs = read_yaml_to_dict(configs_path)

        except Exception as e:
            print(
                f'Error reading configs from {configs_path}: {e}. Using Provided Parameters.'
            )

    app_name = configs.get('app_name', app_name)
    log_level = configs.get('log_level', log_level)
    master_url = configs.get('master_url', master_url)
    jars_path = configs.get('jars_path', jars_path)

    if not self._logger:
        logger_manager.configure(
            app_name=app_name,
            log_level=log_level,
        )

    self._logger = logger_manager.get_logger()['logger']
    env: Dict[str, str] = validate_non_empty_string(env, 'env')[
        'value'
    ].lower()

    factory_class = self.__factories.get(
        validate_non_empty_string(env, 'env')['value'].lower()
    )

    if not factory_class:
        raise ValueError(
            f"Unsupported environment '{env}'. Supported environments: {list(self.__factories.keys())}."
        )

    factory: BaseHariSparkSession = factory_class(
        app_name=app_name,
        master_url=master_url,
        spark_log_level=log_level,
        jars_path=jars_path,
    )

    spark: Dict[str, SparkSession] = factory.create_spark_session(
        spark_extras=spark_extras
    )

    self._spark_session = spark.get('spark_session')
    print(f'Spark session configured for environment: {env}')

get_spark_session()

Get the configured Spark session.

Returns:

Type Description
Dict[str, SparkSession]

Dict[str, SparkSession]: A dictionary containing the SparkSession instance.

Raises:

Type Description
ValueError

If the Spark session has not been configured.

Examples:

>>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
Spark session configured for environment: local
>>> HariSparkSessionManager().get_spark_session()
{'spark_session': <pyspark.sql.session.SparkSession object at 0x...>}
>>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
>>> HariSparkSessionManager().get_spark_session()
Traceback (most recent call last):
    ...
ValueError: Spark session is not configured. Please call 'configure' first.
Source code in hari_data/session/hari_spark_session_manager.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_spark_session(self) -> Dict[str, SparkSession]:
    """
    Get the configured Spark session.

    Returns:
        Dict[str, SparkSession]: A dictionary containing the SparkSession instance.

    Raises:
        ValueError: If the Spark session has not been configured.

    Examples:
        >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
        Spark session configured for environment: local
        >>> HariSparkSessionManager().get_spark_session()
        {'spark_session': <pyspark.sql.session.SparkSession object at 0x...>}

        >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
        >>> HariSparkSessionManager().get_spark_session()
        Traceback (most recent call last):
            ...
        ValueError: Spark session is not configured. Please call 'configure' first.
    """
    if not self.is_configured:
        raise ValueError(
            "Spark session is not configured. Please call 'configure' first."
        )
    return {'spark_session': self._spark_session}

stop_spark_session()

Stop the Spark session and reset the manager.

Examples:

>>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
>>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
Spark session configured for environment: local
>>> HariSparkSessionManager().stop_spark_session()
Spark session stopped and manager reset.
>>> HariSparkSessionManager().stop_spark_session()
Source code in hari_data/session/hari_spark_session_manager.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def stop_spark_session(self) -> None:
    """
    Stop the Spark session and reset the manager.

    Examples:
        >>> HariSparkSessionManager._instance = None; HariSparkSessionManager._spark_session = None
        >>> HariSparkSessionManager().configure(env='local',configs_path=None,app_name='MyApp', master_url='local[*]', jars_path=None)
        Spark session configured for environment: local
        >>> HariSparkSessionManager().stop_spark_session()
        Spark session stopped and manager reset.
        >>> HariSparkSessionManager().stop_spark_session()
    """
    if not self.is_configured:
        return
    self._spark_session.stop()
    self._spark_session = None
    self._instance = None
    print('Spark session stopped and manager reset.')